From fc30507024be6a9e2e4722e8668f7c2d7625c75b Mon Sep 17 00:00:00 2001 From: vam Date: Fri, 10 Mar 2017 17:09:00 -0800 Subject: [PATCH] Reconcile RetrySettings & Clock Reconcile RetrySettings in GAX with RetryParams in google-cloud-java https://github.com/GoogleCloudPlatform/google-cloud-java/issues/1574 Reconcile NanoClock in GAX with Clock in google-cloud-java https://github.com/GoogleCloudPlatform/google-cloud-java/issues/1575 --- .../google/api/gax/core/DefaultNanoClock.java | 61 +++++ .../api/gax/{grpc => core}/NanoClock.java | 7 +- .../google/api/gax/core/RetrySettings.java | 32 ++- .../com/google/api/gax/core/SystemClock.java | 58 +++++ .../google/api/gax/grpc/RetryingCallable.java | 240 ++++++------------ .../google/api/gax/grpc/UnaryCallable.java | 6 +- .../gax/retrying/AbstractRetryHandler.java | 137 ++++++++++ .../api/gax/retrying/DirectRetryHandler.java | 81 ++++++ .../gax/retrying/RetryAttemptSettings.java | 106 ++++++++ .../google/api/gax/retrying/RetryFuture.java | 57 +++++ .../api/gax/retrying/RetryFutureImpl.java | 175 +++++++++++++ .../google/api/gax/retrying/RetryHandler.java | 101 ++++++++ .../gax/retrying/ScheduledRetryHandler.java | 87 +++++++ .../google/api/gax/grpc/CancellationTest.java | 85 ++++--- .../google/api/gax/grpc/FakeNanoClock.java | 8 +- .../api/gax/grpc/RecordingScheduler.java | 68 +++-- .../api/gax/grpc/UnaryCallableTest.java | 129 +++++++--- .../retrying/AbstractRetryHandlerTest.java | 140 ++++++++++ .../gax/retrying/DirectRetryHandlerTest.java} | 21 +- .../api/gax/retrying/FailingCallable.java | 72 ++++++ .../retrying/ScheduledRetryHandlerTest.java | 118 +++++++++ version.txt | 2 +- 22 files changed, 1504 insertions(+), 287 deletions(-) create mode 100644 src/main/java/com/google/api/gax/core/DefaultNanoClock.java rename src/main/java/com/google/api/gax/{grpc => core}/NanoClock.java (92%) create mode 100644 src/main/java/com/google/api/gax/core/SystemClock.java create mode 100644 src/main/java/com/google/api/gax/retrying/AbstractRetryHandler.java create mode 100644 src/main/java/com/google/api/gax/retrying/DirectRetryHandler.java create mode 100644 src/main/java/com/google/api/gax/retrying/RetryAttemptSettings.java create mode 100644 src/main/java/com/google/api/gax/retrying/RetryFuture.java create mode 100644 src/main/java/com/google/api/gax/retrying/RetryFutureImpl.java create mode 100644 src/main/java/com/google/api/gax/retrying/RetryHandler.java create mode 100644 src/main/java/com/google/api/gax/retrying/ScheduledRetryHandler.java create mode 100644 src/test/java/com/google/api/gax/retrying/AbstractRetryHandlerTest.java rename src/{main/java/com/google/api/gax/grpc/DefaultNanoClock.java => test/java/com/google/api/gax/retrying/DirectRetryHandlerTest.java} (78%) create mode 100644 src/test/java/com/google/api/gax/retrying/FailingCallable.java create mode 100644 src/test/java/com/google/api/gax/retrying/ScheduledRetryHandlerTest.java diff --git a/src/main/java/com/google/api/gax/core/DefaultNanoClock.java b/src/main/java/com/google/api/gax/core/DefaultNanoClock.java new file mode 100644 index 000000000..38adf29b2 --- /dev/null +++ b/src/main/java/com/google/api/gax/core/DefaultNanoClock.java @@ -0,0 +1,61 @@ +/* + * Copyright 2016, Google Inc. All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ +package com.google.api.gax.core; + +import java.io.ObjectStreamException; +import java.io.Serializable; +import java.util.concurrent.TimeUnit; + +/** Default implementation of the NanoClock interface, using call to System.nanoTime(). */ +public final class DefaultNanoClock implements NanoClock, Serializable { + + private static final NanoClock DEFAULT_CLOCK = new DefaultNanoClock(); + private static final long serialVersionUID = 5541462688633944865L; + + public static NanoClock getDefaultClock() { + return DEFAULT_CLOCK; + } + + private DefaultNanoClock() {} + + @Override + public final long nanoTime() { + return System.nanoTime(); + } + + @Override + public final long millisTime() { + return TimeUnit.MILLISECONDS.convert(nanoTime(), TimeUnit.NANOSECONDS); + } + + private Object readResolve() throws ObjectStreamException { + return DEFAULT_CLOCK; + } +} diff --git a/src/main/java/com/google/api/gax/grpc/NanoClock.java b/src/main/java/com/google/api/gax/core/NanoClock.java similarity index 92% rename from src/main/java/com/google/api/gax/grpc/NanoClock.java rename to src/main/java/com/google/api/gax/core/NanoClock.java index 6f62b42de..e7f01fbaf 100644 --- a/src/main/java/com/google/api/gax/grpc/NanoClock.java +++ b/src/main/java/com/google/api/gax/core/NanoClock.java @@ -27,7 +27,7 @@ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. */ -package com.google.api.gax.grpc; +package com.google.api.gax.core; /** * An interface for getting the current value of a high-resolution time source, in nanoseconds. @@ -43,4 +43,9 @@ public interface NanoClock { * Returns the current value of this clock's high-resolution time source, in nanoseconds. */ long nanoTime(); + + /** + * Returns the current value of this clock's high-resolution time source, in milliseconds. + */ + long millisTime(); } diff --git a/src/main/java/com/google/api/gax/core/RetrySettings.java b/src/main/java/com/google/api/gax/core/RetrySettings.java index d9406b88e..09415e0ce 100644 --- a/src/main/java/com/google/api/gax/core/RetrySettings.java +++ b/src/main/java/com/google/api/gax/core/RetrySettings.java @@ -30,6 +30,7 @@ package com.google.api.gax.core; import com.google.auto.value.AutoValue; +import java.io.Serializable; import org.joda.time.Duration; /** @@ -64,7 +65,9 @@ * is computed which will terminate the call when the total time is up. */ @AutoValue -public abstract class RetrySettings { +public abstract class RetrySettings implements Serializable { + + private static final long serialVersionUID = 8258475264439710899L; /** * The TotalTimeout parameter has ultimate control over how long the logic should keep trying the @@ -91,6 +94,13 @@ public abstract class RetrySettings { */ public abstract Duration getMaxRetryDelay(); + /** + * MaxAttempts defines the maximum number of attempts to perform. If number of attempts reaches + * this limit the logic will give up retrying even if the total retry time is still lower than + * TotalTimeout. + */ + public abstract int getMaxAttempts(); + /** * The InitialRpcTimeout parameter controls the timeout for the initial RPC. Subsequent calls will * use this value adjusted according to the RpcTimeoutMultiplier. @@ -110,7 +120,7 @@ public abstract class RetrySettings { public abstract Duration getMaxRpcTimeout(); public static Builder newBuilder() { - return new AutoValue_RetrySettings.Builder(); + return new AutoValue_RetrySettings.Builder().setMaxAttempts(0); } public Builder toBuilder() { @@ -150,6 +160,13 @@ public abstract static class Builder { */ public abstract Builder setMaxRetryDelay(Duration maxDelay); + /** + * MaxAttempts defines the maximum number of attempts to perform. If number of attempts reaches + * this limit the logic will give up retrying even if the total retry time is still lower than + * TotalTimeout. + */ + public abstract Builder setMaxAttempts(int maxAttempts); + /** * The InitialRpcTimeout parameter controls the timeout for the initial RPC. Subsequent calls * will use this value adjusted according to the RpcTimeoutMultiplier. @@ -188,6 +205,13 @@ public abstract static class Builder { */ public abstract double getRetryDelayMultiplier(); + /** + * MaxAttempts defines the maximum number of attempts to perform. If number of attempts reaches + * this limit the logic will give up retrying even if the total retry time is still lower than + * TotalTimeout. + */ + public abstract int getMaxAttempts(); + /** * The MaxRetryDelay puts a limit on the value of the retry delay, so that the * RetryDelayMultiplier can't increase the retry delay higher than this amount. @@ -228,6 +252,9 @@ public RetrySettings build() { if (params.getMaxRetryDelay().compareTo(params.getInitialRetryDelay()) < 0) { throw new IllegalStateException("max retry delay must not be shorter than initial delay"); } + if (params.getMaxAttempts() < 0) { + throw new IllegalStateException("max attempts must be non-negative"); + } if (params.getInitialRpcTimeout().getMillis() < 0) { throw new IllegalStateException("initial rpc timeout must not be negative"); } @@ -253,6 +280,7 @@ public RetrySettings.Builder merge(RetrySettings.Builder newSettings) { if (newSettings.getMaxRetryDelay() != null) { setMaxRetryDelay(newSettings.getMaxRetryDelay()); } + setMaxAttempts(newSettings.getMaxAttempts()); if (newSettings.getInitialRpcTimeout() != null) { setInitialRpcTimeout(newSettings.getInitialRpcTimeout()); } diff --git a/src/main/java/com/google/api/gax/core/SystemClock.java b/src/main/java/com/google/api/gax/core/SystemClock.java new file mode 100644 index 000000000..22a4e87c9 --- /dev/null +++ b/src/main/java/com/google/api/gax/core/SystemClock.java @@ -0,0 +1,58 @@ +/* + * Copyright 2017, Google Inc. All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +package com.google.api.gax.core; + +import java.io.ObjectStreamException; +import java.io.Serializable; +import java.util.concurrent.TimeUnit; + +public final class SystemClock implements NanoClock, Serializable { + private static final long serialVersionUID = -6019259882852183285L; + private static final NanoClock DEFAULT_CLOCK = new SystemClock(); + + public static NanoClock getDefaultClock() { + return DEFAULT_CLOCK; + } + + @Override + public long nanoTime() { + return TimeUnit.NANOSECONDS.convert(millisTime(), TimeUnit.MILLISECONDS); + } + + @Override + public long millisTime() { + return System.currentTimeMillis(); + } + + private Object readResolve() throws ObjectStreamException { + return DEFAULT_CLOCK; + } +} diff --git a/src/main/java/com/google/api/gax/grpc/RetryingCallable.java b/src/main/java/com/google/api/gax/grpc/RetryingCallable.java index 811f0abd1..72fe3e3c7 100644 --- a/src/main/java/com/google/api/gax/grpc/RetryingCallable.java +++ b/src/main/java/com/google/api/gax/grpc/RetryingCallable.java @@ -29,60 +29,55 @@ */ package com.google.api.gax.grpc; -import com.google.api.gax.core.AbstractApiFuture; import com.google.api.gax.core.ApiFuture; -import com.google.api.gax.core.ApiFutureCallback; -import com.google.api.gax.core.ApiFutures; +import com.google.api.gax.core.NanoClock; import com.google.api.gax.core.RetrySettings; +import com.google.api.gax.core.internal.ApiFutureToListenableFuture; +import com.google.api.gax.retrying.RetryAttemptSettings; +import com.google.api.gax.retrying.RetryFuture; +import com.google.api.gax.retrying.ScheduledRetryHandler; import com.google.common.base.Preconditions; import io.grpc.CallOptions; -import io.grpc.Status; -import java.util.concurrent.CancellationException; -import java.util.concurrent.Future; -import java.util.concurrent.ThreadLocalRandom; +import io.grpc.Status.Code; +import java.util.concurrent.Callable; +import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; -import javax.annotation.Nullable; import org.joda.time.Duration; /** - * Implements the retry and timeout functionality used in {@link UnaryCallable}. The behavior is - * controlled by the given {@link RetrySettings}. + * Implements the retry and timeout functionality used in {@link UnaryCallable}. + * + * The behavior is controlled by the given {@link RetrySettings}. */ class RetryingCallable implements FutureCallable { - // Duration to sleep on if the error is DEADLINE_EXCEEDED. static final Duration DEADLINE_SLEEP_DURATION = Duration.millis(1); private final FutureCallable callable; private final RetrySettings retryParams; - private final UnaryCallable.Scheduler executor; + private final ScheduledExecutorService scheduler; private final NanoClock clock; RetryingCallable( FutureCallable callable, RetrySettings retrySettings, - UnaryCallable.Scheduler executor, + ScheduledExecutorService scheduler, NanoClock clock) { this.callable = Preconditions.checkNotNull(callable); this.retryParams = Preconditions.checkNotNull(retrySettings); - this.executor = executor; + this.scheduler = scheduler; this.clock = clock; } @Override public ApiFuture futureCall(RequestT request, CallContext context) { - RetryingResultFuture resultFuture = new RetryingResultFuture(); - context = getCallContextWithDeadlineAfter(context, retryParams.getTotalTimeout()); - Retryer retryer = - new Retryer( - request, - context, - resultFuture, - retryParams.getInitialRetryDelay(), - retryParams.getInitialRpcTimeout(), - null); - resultFuture.issueCall(request, context, retryer); - return resultFuture; + GrpcRetryCallable retryCallable = + new GrpcRetryCallable<>(callable, request, context); + GrpcRetryHandler retryHelper = new GrpcRetryHandler<>(clock, scheduler); + RetryFuture rv = retryHelper.createFirstAttempt(retryCallable, retryParams); + retryCallable.setExternalFuture(rv); + retryCallable.call(); + return rv; } @Override @@ -90,137 +85,6 @@ public String toString() { return String.format("retrying(%s)", callable); } - private class Retryer implements Runnable, ApiFutureCallback { - private final RequestT request; - private final CallContext context; - private final RetryingResultFuture resultFuture; - private final Duration retryDelay; - private final Duration rpcTimeout; - private final Throwable savedThrowable; - - private Retryer( - RequestT request, - CallContext context, - RetryingResultFuture resultFuture, - Duration retryDelay, - Duration rpcTimeout, - Throwable savedThrowable) { - this.request = request; - this.context = context; - this.resultFuture = resultFuture; - this.retryDelay = retryDelay; - this.rpcTimeout = rpcTimeout; - this.savedThrowable = savedThrowable; - } - - @Override - public void run() { - if (context.getCallOptions().getDeadlineNanoTime() < clock.nanoTime()) { - if (savedThrowable == null) { - resultFuture.setException( - Status.DEADLINE_EXCEEDED - .withDescription("Total deadline exceeded without completing any call") - .asException()); - } else { - resultFuture.setException(savedThrowable); - } - return; - } - CallContext deadlineContext = getCallContextWithDeadlineAfter(context, rpcTimeout); - resultFuture.issueCall(request, deadlineContext, this); - } - - @Override - public void onSuccess(ResponseT r) { - resultFuture.set(r); - } - - @Override - public void onFailure(Throwable throwable) { - if (!canRetry(throwable)) { - resultFuture.setException(throwable); - return; - } - if (isDeadlineExceeded(throwable)) { - Retryer retryer = - new Retryer(request, context, resultFuture, retryDelay, rpcTimeout, throwable); - resultFuture.scheduleNext( - executor, retryer, DEADLINE_SLEEP_DURATION.getMillis(), TimeUnit.MILLISECONDS); - return; - } - - long newRetryDelay = (long) (retryDelay.getMillis() * retryParams.getRetryDelayMultiplier()); - newRetryDelay = Math.min(newRetryDelay, retryParams.getMaxRetryDelay().getMillis()); - - long newRpcTimeout = (long) (rpcTimeout.getMillis() * retryParams.getRpcTimeoutMultiplier()); - newRpcTimeout = Math.min(newRpcTimeout, retryParams.getMaxRpcTimeout().getMillis()); - - long randomRetryDelay = ThreadLocalRandom.current().nextLong(retryDelay.getMillis()); - Retryer retryer = - new Retryer( - request, - context, - resultFuture, - Duration.millis(newRetryDelay), - Duration.millis(newRpcTimeout), - throwable); - resultFuture.scheduleNext(executor, retryer, randomRetryDelay, TimeUnit.MILLISECONDS); - } - } - - private class RetryingResultFuture extends AbstractApiFuture { - private volatile Future activeFuture = null; - private final Object syncObject = new Object(); - - @Override - protected void interruptTask() { - synchronized (syncObject) { - activeFuture.cancel(true); - } - } - - @Override - public boolean set(@Nullable ResponseT value) { - synchronized (syncObject) { - return super.set(value); - } - } - - @Override - public boolean setException(Throwable throwable) { - synchronized (syncObject) { - if (throwable instanceof CancellationException) { - super.cancel(false); - return true; - } else { - return super.setException(throwable); - } - } - } - - private void scheduleNext( - UnaryCallable.Scheduler executor, Runnable retryer, long delay, TimeUnit unit) { - synchronized (syncObject) { - if (!isCancelled()) { - activeFuture = executor.schedule(retryer, delay, TimeUnit.MILLISECONDS); - } - } - } - - public void issueCall( - RequestT request, - CallContext deadlineContext, - RetryingCallable.Retryer retryer) { - synchronized (syncObject) { - if (!isCancelled()) { - ApiFuture callFuture = callable.futureCall(request, deadlineContext); - ApiFutures.addCallback(callFuture, retryer); - activeFuture = callFuture; - } - } - } - } - private static CallContext getCallContextWithDeadlineAfter( CallContext oldCtx, Duration rpcTimeout) { CallOptions oldOpt = oldCtx.getCallOptions(); @@ -236,19 +100,61 @@ private static CallContext getCallContextWithDeadlineAfter( return newCtx; } - private static boolean canRetry(Throwable throwable) { - if (!(throwable instanceof ApiException)) { - return false; + private static class GrpcRetryCallable implements Callable { + private final FutureCallable callable; + private final RequestT request; + + private RetryFuture externalFuture; + private CallContext callContext; + + private GrpcRetryCallable( + FutureCallable callable, RequestT request, CallContext callContext) { + this.callable = callable; + this.request = request; + this.callContext = callContext; + } + + private void setExternalFuture(RetryFuture externalFuture) { + this.externalFuture = externalFuture; + } + + @Override + public ResponseT call() { + callContext = + getCallContextWithDeadlineAfter( + callContext, externalFuture.getAttemptSettings().getRpcTimeout()); + ApiFuture internalFuture = callable.futureCall(request, callContext); + externalFuture.setAttemptFuture(new ApiFutureToListenableFuture<>(internalFuture)); + return null; } - ApiException apiException = (ApiException) throwable; - return apiException.isRetryable(); } - private static boolean isDeadlineExceeded(Throwable throwable) { - if (!(throwable instanceof ApiException)) { - return false; + private static class GrpcRetryHandler extends ScheduledRetryHandler { + private GrpcRetryHandler(NanoClock clock, ScheduledExecutorService scheduler) { + super(clock, scheduler); + } + + @Override + public boolean accept(Throwable e, RetryAttemptSettings nextAttemptSettings) { + return super.accept(e, nextAttemptSettings) + && (e instanceof ApiException) + && ((ApiException) e).isRetryable(); + } + + @Override + public RetryAttemptSettings createNextAttemptSettings( + Throwable e, RetryAttemptSettings prevSettings) { + if (((ApiException) e).getStatusCode() == Code.DEADLINE_EXCEEDED) { + return new RetryAttemptSettings( + prevSettings.getGlobalSettings(), + prevSettings.getRetryDelay(), + prevSettings.getRpcTimeout(), + DEADLINE_SLEEP_DURATION, + prevSettings.getAttemptCount() + 1, + prevSettings.getFirstAttemptStartTime()); + } + + return super.createNextAttemptSettings(e, prevSettings); } - ApiException apiException = (ApiException) throwable; - return apiException.getStatusCode() == Status.Code.DEADLINE_EXCEEDED; } } diff --git a/src/main/java/com/google/api/gax/grpc/UnaryCallable.java b/src/main/java/com/google/api/gax/grpc/UnaryCallable.java index e579d64ed..604553314 100644 --- a/src/main/java/com/google/api/gax/grpc/UnaryCallable.java +++ b/src/main/java/com/google/api/gax/grpc/UnaryCallable.java @@ -30,6 +30,8 @@ package com.google.api.gax.grpc; import com.google.api.gax.core.ApiFuture; +import com.google.api.gax.core.DefaultNanoClock; +import com.google.api.gax.core.NanoClock; import com.google.api.gax.core.RetrySettings; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; @@ -323,7 +325,7 @@ UnaryCallable retryableOn(ImmutableSet retryab */ UnaryCallable retrying( RetrySettings retrySettings, ScheduledExecutorService executor) { - return retrying(retrySettings, new DelegatingScheduler(executor), DefaultNanoClock.create()); + return retrying(retrySettings, executor, DefaultNanoClock.getDefaultClock()); } /** @@ -336,7 +338,7 @@ UnaryCallable retrying( */ @VisibleForTesting UnaryCallable retrying( - RetrySettings retrySettings, Scheduler executor, NanoClock clock) { + RetrySettings retrySettings, ScheduledExecutorService executor, NanoClock clock) { return new UnaryCallable<>( new RetryingCallable<>(callable, retrySettings, executor, clock), channel, settings); } diff --git a/src/main/java/com/google/api/gax/retrying/AbstractRetryHandler.java b/src/main/java/com/google/api/gax/retrying/AbstractRetryHandler.java new file mode 100644 index 000000000..053d360a6 --- /dev/null +++ b/src/main/java/com/google/api/gax/retrying/AbstractRetryHandler.java @@ -0,0 +1,137 @@ +/* + * Copyright 2017, Google Inc. All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ +package com.google.api.gax.retrying; + +import com.google.api.gax.core.NanoClock; +import com.google.api.gax.core.RetrySettings; +import java.util.concurrent.Callable; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.TimeUnit; +import org.joda.time.Duration; + +/** + * Basic implementation of the retry {@link RetryHandler} interface. Responsible for defining and + * checking attempt's basic properties (execution time and count limits). + * + * This class is thread-safe, and all inheriting classes are required to be thread-safe. + * + * @param response type + */ +public abstract class AbstractRetryHandler implements RetryHandler { + + private final NanoClock clock; + + protected AbstractRetryHandler(NanoClock clock) { + this.clock = clock; + } + + /** + * Ensures that the retry logic hasn't exceeded neither maximum number of retries nor the total + * execution timeout. + * + * @param e exception thrown by the previous attempt + * @param nextAttemptSettings attempt settings, which will be used for the next attempt, if + * accepted + * @return {@code true} if none of the retry limits are exceeded + */ + @Override + public boolean accept(Throwable e, RetryAttemptSettings nextAttemptSettings) { + RetrySettings globalSettings = nextAttemptSettings.getGlobalSettings(); + long randRetryDelayMillis = nextAttemptSettings.getRandomizedRetryDelay().getMillis(); + long totalTimeSpentNanos = + clock.nanoTime() + - nextAttemptSettings.getFirstAttemptStartTime() + + TimeUnit.NANOSECONDS.convert(randRetryDelayMillis, TimeUnit.MILLISECONDS); + + long totalTimeoutMillis = globalSettings.getTotalTimeout().getMillis(); + long totalTimeoutNanos = + TimeUnit.NANOSECONDS.convert(totalTimeoutMillis, TimeUnit.MILLISECONDS); + + return totalTimeSpentNanos <= totalTimeoutNanos + && (globalSettings.getMaxAttempts() <= 0 + || nextAttemptSettings.getAttemptCount() < globalSettings.getMaxAttempts()); + } + + /** + * Crates next attempt settings. It increments the current attempt count and uses randomized + * exponential backoff factor for calculating next attempt execution time. + * + * @param e exception thrown by the previous attempt + * @param prevSettings previous attempt settings + * @return next attempt settings + */ + @Override + public RetryAttemptSettings createNextAttemptSettings( + Throwable e, RetryAttemptSettings prevSettings) { + RetrySettings settings = prevSettings.getGlobalSettings(); + + long newRetryDelay = settings.getInitialRetryDelay().getMillis(); + long newRpcTimeout = settings.getInitialRpcTimeout().getMillis(); + + if (prevSettings.getAttemptCount() > 0) { + newRetryDelay = + (long) (settings.getRetryDelayMultiplier() * prevSettings.getRetryDelay().getMillis()); + newRetryDelay = Math.min(newRetryDelay, settings.getMaxRetryDelay().getMillis()); + newRpcTimeout = + (long) (settings.getRpcTimeoutMultiplier() * prevSettings.getRpcTimeout().getMillis()); + newRpcTimeout = Math.min(newRpcTimeout, settings.getMaxRpcTimeout().getMillis()); + } + + return new RetryAttemptSettings( + prevSettings.getGlobalSettings(), + Duration.millis(newRetryDelay), + Duration.millis(newRpcTimeout), + Duration.millis(ThreadLocalRandom.current().nextLong(newRetryDelay)), + prevSettings.getAttemptCount() + 1, + prevSettings.getFirstAttemptStartTime()); + } + + /** + * Creates first attempt future. By default the first attempt is configured to be executed + * immediately. + * + * @param callable the actual callable, which should be executed in a retriable context + * @param globalSettings global retry settings (attempt independent) + */ + @Override + public RetryFuture createFirstAttempt( + Callable callable, RetrySettings globalSettings) { + RetryAttemptSettings firstAttemptSettings = + new RetryAttemptSettings( + globalSettings, + Duration.ZERO, + globalSettings.getTotalTimeout(), + Duration.ZERO, + 0, + clock.nanoTime()); + + return new RetryFutureImpl<>(callable, firstAttemptSettings, this); + } +} diff --git a/src/main/java/com/google/api/gax/retrying/DirectRetryHandler.java b/src/main/java/com/google/api/gax/retrying/DirectRetryHandler.java new file mode 100644 index 000000000..093a4ac3c --- /dev/null +++ b/src/main/java/com/google/api/gax/retrying/DirectRetryHandler.java @@ -0,0 +1,81 @@ +/* + * Copyright 2017, Google Inc. All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ +package com.google.api.gax.retrying; + +import com.google.api.gax.core.NanoClock; +import com.google.common.util.concurrent.Futures; +import java.io.InterruptedIOException; +import java.nio.channels.ClosedByInterruptException; +import java.util.concurrent.Callable; +import java.util.concurrent.Future; +import org.joda.time.Duration; + +/** + * The retry handler, which executes attempts in the current thread, potentially causing the current + * thread to sleep for a specified amount of type before execution. + * + * This class is thread-safe. + * + * @param response type + */ +public class DirectRetryHandler extends AbstractRetryHandler { + + /** + * Creates a new default direct retry handler + * + * @param clock clock to use during execution scheduling + */ + public DirectRetryHandler(NanoClock clock) { + super(clock); + } + + /** + * Executes attempt in the current thread. Causes the current thread to sleep, if it is not the + * first attempt. + * + * @param callable the actual callable to execute + * @param attemptSettings current attempt settings + */ + @Override + public Future executeAttempt( + Callable callable, RetryAttemptSettings attemptSettings) { + try { + if (Duration.ZERO.compareTo(attemptSettings.getRandomizedRetryDelay()) < 0) { + Thread.sleep(attemptSettings.getRandomizedRetryDelay().getMillis()); + } + return Futures.immediateFuture(callable.call()); + } catch (InterruptedException | InterruptedIOException | ClosedByInterruptException e) { + Thread.currentThread().interrupt(); + return Futures.immediateFailedFuture(e); + } catch (Throwable e) { + return Futures.immediateFailedFuture(e); + } + } +} diff --git a/src/main/java/com/google/api/gax/retrying/RetryAttemptSettings.java b/src/main/java/com/google/api/gax/retrying/RetryAttemptSettings.java new file mode 100644 index 000000000..0916e4c94 --- /dev/null +++ b/src/main/java/com/google/api/gax/retrying/RetryAttemptSettings.java @@ -0,0 +1,106 @@ +/* + * Copyright 2017, Google Inc. All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ +package com.google.api.gax.retrying; + +import com.google.api.gax.core.RetrySettings; +import org.joda.time.Duration; + +/** + * Execution attempt settings. Defines attempt-specific properties of a retry process. + */ +public class RetryAttemptSettings { + + private final RetrySettings globalSettings; + private final Duration retryDelay; + private final Duration rpcTimeout; + private final Duration randomizedRetryDelay; + private final int attemptCount; + private final long firstAttemptStartTime; + + public RetryAttemptSettings( + RetrySettings globalSettings, + Duration retryDelay, + Duration rpcTimeout, + Duration randomizedRetryDelay, + int attemptCount, + long firstAttemptStartTime) { + this.globalSettings = globalSettings; + this.retryDelay = retryDelay; + this.rpcTimeout = rpcTimeout; + this.randomizedRetryDelay = randomizedRetryDelay; + this.attemptCount = attemptCount; + this.firstAttemptStartTime = firstAttemptStartTime; + } + + /** + * Returns global (attempt-independent) retry settings. + */ + public RetrySettings getGlobalSettings() { + return globalSettings; + } + + /** + * Returns the calculated retry delay. Note that the actual delay used for retry scheduling may be + * different (randomized, based on this value). + */ + public Duration getRetryDelay() { + return retryDelay; + } + + /** + * Returns rpc timeout used for this attempt. + */ + public Duration getRpcTimeout() { + return rpcTimeout; + } + + /** + * Returns randomized attempt delay. By default this value is calculated based on the + * {@code retryDelay} value, and is used as the actual attempt execution delay. + */ + public Duration getRandomizedRetryDelay() { + return randomizedRetryDelay; + } + + /** + * The attempt count. It is a zero-based value (first attempt will have this value set to 0). + */ + public int getAttemptCount() { + return attemptCount; + } + + /** + * The start time of the first attempt. Note that this value is dependent on the actual + * {@link com.google.api.gax.core.NanoClock} used during the process. + */ + public long getFirstAttemptStartTime() { + return firstAttemptStartTime; + } +} diff --git a/src/main/java/com/google/api/gax/retrying/RetryFuture.java b/src/main/java/com/google/api/gax/retrying/RetryFuture.java new file mode 100644 index 000000000..80672edc4 --- /dev/null +++ b/src/main/java/com/google/api/gax/retrying/RetryFuture.java @@ -0,0 +1,57 @@ +/* + * Copyright 2017, Google Inc. All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ +package com.google.api.gax.retrying; + +import com.google.api.gax.core.ApiFuture; +import java.util.concurrent.Future; + +/** + * Represents retriable future. This is a facade hiding all the complications of a an asynchronous + * execution of a retriable task. + * + * This interface is for advanced/internal use only. + * + * @param response type + */ +public interface RetryFuture extends ApiFuture { + + /** + * Set the attempt future. This future represents a concrete retry attempt, potentially scheduled + * for execution in some form of {@link java.util.concurrent.ScheduledExecutorService}. + * + * @param attemptFuture the attempt future + */ + void setAttemptFuture(Future attemptFuture); + + /** + * Returns current (active) attempt settings. + */ + RetryAttemptSettings getAttemptSettings(); +} diff --git a/src/main/java/com/google/api/gax/retrying/RetryFutureImpl.java b/src/main/java/com/google/api/gax/retrying/RetryFutureImpl.java new file mode 100644 index 000000000..9c44b0bc1 --- /dev/null +++ b/src/main/java/com/google/api/gax/retrying/RetryFutureImpl.java @@ -0,0 +1,175 @@ +/* + * Copyright 2017, Google Inc. All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +package com.google.api.gax.retrying; + +import com.google.api.gax.core.ApiFutureCallback; +import com.google.common.util.concurrent.AbstractFuture; +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import java.util.concurrent.Callable; +import java.util.concurrent.Future; + +/** + * For internal use only. + * + * This class is the key component of the retry logic. It implements {@link RetryFuture} facade + * interface, and is doing the following: + * + *
    + *
  • Schedules next attempt in case of failure using callback chaining technique.
  • + *
  • Terminates retrying process if more retries are not accepted.
  • + *
  • Propagates future cancellation in both directions (from this to the attempt and from the + * attempt to this)
  • + *
+ * + * This class is thread-safe. + */ +class RetryFutureImpl extends AbstractFuture + implements RetryFuture { + + private final Object lock = new Object(); + private final Callable callable; + private final RetryHandler retryHandler; + + private volatile RetryAttemptSettings attemptSettings; + private volatile AttemptFutureCallback callbackFutureCallback; + + RetryFutureImpl( + Callable callable, + RetryAttemptSettings attemptSettings, + RetryHandler retryHandler) { + this.callable = callable; + this.attemptSettings = attemptSettings; + this.retryHandler = retryHandler; + } + + @Override + protected boolean set(ResponseT value) { + return super.set(value); + } + + @Override + protected boolean setException(Throwable throwable) { + return super.setException(throwable); + } + + @Override + public boolean cancel(boolean mayInterruptIfRunning) { + synchronized (lock) { + boolean rv = super.cancel(mayInterruptIfRunning); + if (callbackFutureCallback != null) { + callbackFutureCallback.attemptFuture.cancel(mayInterruptIfRunning); + } + return rv; + } + } + + @Override + public RetryAttemptSettings getAttemptSettings() { + synchronized (lock) { + return attemptSettings; + } + } + + @Override + public void setAttemptFuture(Future attemptFuture) { + synchronized (lock) { + if (attemptFuture != null) { + callbackFutureCallback = new AttemptFutureCallback(attemptFuture); + Futures.addCallback((ListenableFuture) attemptFuture, callbackFutureCallback); + if (isCancelled()) { + attemptFuture.cancel(false); + } + } else { + callbackFutureCallback = null; + } + } + } + + private void scheduleRetry(Throwable delegateThrowable, Future prevProxiedFuture) { + try { + if (prevProxiedFuture.isCancelled()) { + cancel(false); + } + if (isDone()) { + return; + } + + RetryAttemptSettings nextAttemptSettings = + retryHandler.createNextAttemptSettings(delegateThrowable, attemptSettings); + if (retryHandler.accept(delegateThrowable, nextAttemptSettings)) { + attemptSettings = nextAttemptSettings; + Future nextInternalFuture = + retryHandler.executeAttempt(callable, attemptSettings); + setAttemptFuture(nextInternalFuture); + } else { + setException(delegateThrowable); + } + } catch (Throwable e) { + setException(delegateThrowable); + } + } + + private class AttemptFutureCallback + implements FutureCallback, ApiFutureCallback { + + private Future attemptFuture; + + private AttemptFutureCallback(Future attemptFuture) { + this.attemptFuture = attemptFuture; + } + + @Override + public void onSuccess(ResponseT result) { + if (this == callbackFutureCallback && !isDone()) { + synchronized (lock) { + if (this == callbackFutureCallback && !isDone()) { + setAttemptFuture(null); + set(result); + } + } + } + } + + @Override + public void onFailure(Throwable t) { + if (this == callbackFutureCallback && !isDone()) { + synchronized (lock) { + if (this == callbackFutureCallback && !isDone()) { + setAttemptFuture(null); + scheduleRetry(t, this.attemptFuture); + } + } + } + } + } +} diff --git a/src/main/java/com/google/api/gax/retrying/RetryHandler.java b/src/main/java/com/google/api/gax/retrying/RetryHandler.java new file mode 100644 index 000000000..8ea6037f2 --- /dev/null +++ b/src/main/java/com/google/api/gax/retrying/RetryHandler.java @@ -0,0 +1,101 @@ +/* + * Copyright 2017, Google Inc. All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ +package com.google.api.gax.retrying; + +import com.google.api.gax.core.RetrySettings; +import java.util.concurrent.Callable; +import java.util.concurrent.Future; + +/** + * A retry handler is responsible for the following operations: + *
    + *
  1. Accepting or rejecting a task for retry, depending on the previous attempt result (exception) + * and/or other attempt properties (like number of already executed attempts or total time spent + * retrying).
  2. + * + *
  3. Creating first attempt {@link RetryFuture}, which acts as a facade, hiding from client code + * the actual scheduled retry task.
  4. + * + *
  5. Creating {@link RetryAttemptSettings} for each subsequnt retry attempt.
  6. + * + *
  7. Executing the actual {@link Callable} in a retriable context.
  8. + * + *
+ * + * This interface is for internal/advanced use only. + * + * @param response type + */ +public interface RetryHandler { + + /** + * Returns {@code true} if another attempt should be made, or {@code false} otherwise + * + * @param e exception thrown by the previous attempt + * @param nextAttemptSettings attempt settings, which will be used for the next attempt, if + * accepted + * @return {@code true} if another attempt should be made, or {@code false} otherwise + */ + boolean accept(Throwable e, RetryAttemptSettings nextAttemptSettings); + + /** + * Creates a first try {@link RetryFuture}, which is a facade, returned to the client code to wait + * for any retriable operation to complete. + * + * @param callable the actual callable, which should be executed in a retriable context + * @param globalSettings global retry settings (attempt independent) + * @return retriable future facade + */ + RetryFuture createFirstAttempt( + Callable callable, RetrySettings globalSettings); + + /** + * Creates next attempt {@link RetryAttemptSettings}, which defines properties of the next + * attempt. Eventually this object will be passed to + * {@link RetryHandler#accept(Throwable, RetryAttemptSettings)} and + * {@link RetryHandler#executeAttempt(Callable, RetryAttemptSettings)}. + * + * @param e exception thrown by the previous attempt + * @param prevSettings previous attempt settings + * @return next attempt settings + */ + RetryAttemptSettings createNextAttemptSettings(Throwable e, RetryAttemptSettings prevSettings); + + /** + * Executes an attempt. A typical implementation will either try to execute in the current thread + * or schedule it for an execution, using some sort of async execution service. + * + * @param callable the actual callable to execute + * @param attemptSettings current attempt settings + * @return the {@link Future}, representing the scheduled execution + */ + Future executeAttempt( + Callable callable, RetryAttemptSettings attemptSettings); +} diff --git a/src/main/java/com/google/api/gax/retrying/ScheduledRetryHandler.java b/src/main/java/com/google/api/gax/retrying/ScheduledRetryHandler.java new file mode 100644 index 000000000..5be4d741d --- /dev/null +++ b/src/main/java/com/google/api/gax/retrying/ScheduledRetryHandler.java @@ -0,0 +1,87 @@ +/* + * Copyright 2017, Google Inc. All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ +package com.google.api.gax.retrying; + +import com.google.api.gax.core.NanoClock; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListeningScheduledExecutorService; +import com.google.common.util.concurrent.MoreExecutors; +import java.util.concurrent.Callable; +import java.util.concurrent.Future; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +/** + * A retry handler which uses {@link ScheduledExecutorService} to schedule attempt tasks. Unless a + * direct executor service is used, this handler will schedule attempts for an execution in another + * thread. + * + * This class is thread-safe. + * + * @param + */ +public class ScheduledRetryHandler extends AbstractRetryHandler { + private final ListeningScheduledExecutorService scheduler; + + /** + * Creates new scheduled retry handler, which will be using {@link ScheduledExecutorService} for + * acutal attempts scheduling. + * + * @param clock clock to use for scheduling operations + * @param scheduler scheduler + */ + public ScheduledRetryHandler(NanoClock clock, ScheduledExecutorService scheduler) { + super(clock); + this.scheduler = MoreExecutors.listeningDecorator(scheduler); + } + + /** + * Executes attempt using previously provided shceduller. + * + * @param callable the actual callable to execute + * @param attemptSettings current attempt settings + * @return actual attempt future + */ + @Override + public Future executeAttempt( + Callable callable, RetryAttemptSettings attemptSettings) { + try { + System.out.println( + "Scheduling with delay = " + attemptSettings.getRandomizedRetryDelay().getMillis()); + + return scheduler.schedule( + callable, attemptSettings.getRandomizedRetryDelay().getMillis(), TimeUnit.MILLISECONDS); + } catch (RejectedExecutionException e) { + System.out.println("Rejected"); + return Futures.immediateCancelledFuture(); + } + } +} diff --git a/src/test/java/com/google/api/gax/grpc/CancellationTest.java b/src/test/java/com/google/api/gax/grpc/CancellationTest.java index 7b4f92cff..d951805e6 100644 --- a/src/test/java/com/google/api/gax/grpc/CancellationTest.java +++ b/src/test/java/com/google/api/gax/grpc/CancellationTest.java @@ -29,11 +29,14 @@ */ package com.google.api.gax.grpc; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyLong; +import static org.mockito.Mockito.when; + import com.google.api.gax.core.AbstractApiFuture; import com.google.api.gax.core.ApiFuture; import com.google.api.gax.core.RetrySettings; import com.google.api.gax.core.SettableApiFuture; -import com.google.api.gax.grpc.UnaryCallable.Scheduler; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Lists; import com.google.common.truth.Truth; @@ -54,6 +57,8 @@ import org.junit.runner.RunWith; import org.junit.runners.JUnit4; import org.mockito.Mockito; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; @RunWith(JUnit4.class) public class CancellationTest { @@ -90,7 +95,7 @@ public class CancellationTest { @Before public void resetClock() { fakeClock = new FakeNanoClock(System.nanoTime()); - executor = new RecordingScheduler(fakeClock); + executor = RecordingScheduler.get(fakeClock); } @After @@ -115,24 +120,36 @@ public void cancellationBeforeGetOnRetryingCallable() throws Exception { resultFuture.get(); } - private static class LatchCountDownScheduler implements UnaryCallable.Scheduler { - private final ScheduledExecutorService executor; - private final CountDownLatch latch; - - LatchCountDownScheduler(CountDownLatch latch) { - this.executor = new ScheduledThreadPoolExecutor(1); - this.latch = latch; - } - - @Override - public ScheduledFuture schedule(Runnable runnable, long delay, TimeUnit unit) { - latch.countDown(); - return executor.schedule(runnable, delay, unit); - } - - @Override - public List shutdownNow() { - return executor.shutdownNow(); + private abstract static class LatchCountDownScheduler implements ScheduledExecutorService { + private static LatchCountDownScheduler get(final CountDownLatch latch) { + LatchCountDownScheduler mock = Mockito.mock(LatchCountDownScheduler.class); + + // mock class fields: + final ScheduledExecutorService executor = new ScheduledThreadPoolExecutor(1); + + // mock class methods: + // ScheduledFuture schedule(Runnable command, long delay, TimeUnit unit); + when(mock.schedule(any(Runnable.class), anyLong(), any(TimeUnit.class))) + .then( + new Answer>() { + @Override + public ScheduledFuture answer(InvocationOnMock invocation) throws Throwable { + Object[] args = invocation.getArguments(); + latch.countDown(); + return executor.schedule((Runnable) args[0], (Long) args[1], (TimeUnit) args[2]); + } + }); + // List shutdownNow() + when(mock.shutdownNow()) + .then( + new Answer>() { + @Override + public List answer(InvocationOnMock invocation) throws Throwable { + return executor.shutdownNow(); + } + }); + + return mock; } } @@ -190,10 +207,7 @@ public void cancellationDuringFirstCall() throws Exception { UnaryCallable callable = UnaryCallable.create(innerCallable) .retryableOn(retryable) - .retrying( - FAST_RETRY_SETTINGS, - new UnaryCallable.DelegatingScheduler(new ScheduledThreadPoolExecutor(1)), - fakeClock); + .retrying(FAST_RETRY_SETTINGS, new ScheduledThreadPoolExecutor(1), fakeClock); ApiFuture resultFuture = callable.futureCall(0); CancellationHelpers.cancelInThreadAfterLatchCountDown(resultFuture, callIssuedLatch); @@ -210,16 +224,16 @@ public void cancellationDuringFirstCall() throws Exception { @Test public void cancellationDuringRetryDelay() throws Exception { Throwable throwable = Status.UNAVAILABLE.asException(); - CancellationTrackingFuture innerFuture = CancellationTrackingFuture.create(); + CancellationTrackingFuture innerFuture = CancellationTrackingFuture.create(); Mockito.when(callInt.futureCall((Integer) Mockito.any(), (CallContext) Mockito.any())) .thenReturn(UnaryCallableTest.immediateFailedFuture(throwable)) .thenReturn(innerFuture); CountDownLatch retryScheduledLatch = new CountDownLatch(1); - Scheduler scheduler = new LatchCountDownScheduler(retryScheduledLatch); - ImmutableSet retryable = ImmutableSet.of(Status.Code.UNAVAILABLE); + LatchCountDownScheduler scheduler = LatchCountDownScheduler.get(retryScheduledLatch); + ImmutableSet retryable = ImmutableSet.of(Status.Code.UNAVAILABLE); UnaryCallable callable = - UnaryCallable.create(callInt) + UnaryCallable.create(callInt) .retryableOn(retryable) .retrying(SLOW_RETRY_SETTINGS, scheduler, fakeClock); @@ -239,21 +253,18 @@ public void cancellationDuringRetryDelay() throws Exception { public void cancellationDuringSecondCall() throws Exception { Throwable throwable = Status.UNAVAILABLE.asException(); ApiFuture failingFuture = UnaryCallableTest.immediateFailedFuture(throwable); - CancellationTrackingFuture innerFuture = CancellationTrackingFuture.create(); + CancellationTrackingFuture innerFuture = CancellationTrackingFuture.create(); CountDownLatch callIssuedLatch = new CountDownLatch(2); @SuppressWarnings("unchecked") FutureCallable innerCallable = - new LatchCountDownFutureCallable( - callIssuedLatch, Lists.>newArrayList(failingFuture, innerFuture)); + new LatchCountDownFutureCallable<>( + callIssuedLatch, Lists.newArrayList(failingFuture, innerFuture)); - ImmutableSet retryable = ImmutableSet.of(Status.Code.UNAVAILABLE); + ImmutableSet retryable = ImmutableSet.of(Status.Code.UNAVAILABLE); UnaryCallable callable = - UnaryCallable.create(innerCallable) + UnaryCallable.create(innerCallable) .retryableOn(retryable) - .retrying( - FAST_RETRY_SETTINGS, - new UnaryCallable.DelegatingScheduler(new ScheduledThreadPoolExecutor(1)), - fakeClock); + .retrying(FAST_RETRY_SETTINGS, new ScheduledThreadPoolExecutor(1), fakeClock); ApiFuture resultFuture = callable.futureCall(0); CancellationHelpers.cancelInThreadAfterLatchCountDown(resultFuture, callIssuedLatch); diff --git a/src/test/java/com/google/api/gax/grpc/FakeNanoClock.java b/src/test/java/com/google/api/gax/grpc/FakeNanoClock.java index 1fb7695cc..a735a9af3 100644 --- a/src/test/java/com/google/api/gax/grpc/FakeNanoClock.java +++ b/src/test/java/com/google/api/gax/grpc/FakeNanoClock.java @@ -29,8 +29,9 @@ */ package com.google.api.gax.grpc; -class FakeNanoClock implements NanoClock { +import com.google.api.gax.core.NanoClock; +class FakeNanoClock implements NanoClock { private volatile long currentNanoTime; public FakeNanoClock(long initialNanoTime) { @@ -42,6 +43,11 @@ public long nanoTime() { return currentNanoTime; } + @Override + public long millisTime() { + return nanoTime() / 1000_000L; + } + public void setCurrentNanoTime(long nanoTime) { currentNanoTime = nanoTime; } diff --git a/src/test/java/com/google/api/gax/grpc/RecordingScheduler.java b/src/test/java/com/google/api/gax/grpc/RecordingScheduler.java index 66f9c4c11..01a3f6e16 100644 --- a/src/test/java/com/google/api/gax/grpc/RecordingScheduler.java +++ b/src/test/java/com/google/api/gax/grpc/RecordingScheduler.java @@ -29,6 +29,10 @@ */ package com.google.api.gax.grpc; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyLong; +import static org.mockito.Mockito.when; + import java.util.ArrayList; import java.util.List; import java.util.concurrent.ScheduledExecutorService; @@ -36,34 +40,52 @@ import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; import org.joda.time.Duration; +import org.mockito.Mockito; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; -class RecordingScheduler implements UnaryCallable.Scheduler { - private final ScheduledExecutorService executor; - private final List sleepDurations = new ArrayList<>(); - private final FakeNanoClock clock; +abstract class RecordingScheduler implements ScheduledExecutorService { - public RecordingScheduler(FakeNanoClock clock) { - this.executor = new ScheduledThreadPoolExecutor(1); - this.clock = clock; - } + abstract List getSleepDurations(); - @Override - public ScheduledFuture schedule(Runnable runnable, long delay, TimeUnit unit) { - sleepDurations.add(new Duration(TimeUnit.MILLISECONDS.convert(delay, unit))); - clock.setCurrentNanoTime(clock.nanoTime() + TimeUnit.NANOSECONDS.convert(delay, unit)); - return executor.schedule(runnable, 0, TimeUnit.NANOSECONDS); - } + static RecordingScheduler get(final FakeNanoClock clock) { + RecordingScheduler mock = Mockito.mock(RecordingScheduler.class); - @Override - public List shutdownNow() { - return executor.shutdownNow(); - } + // mock class fields: + final ScheduledExecutorService executor = new ScheduledThreadPoolExecutor(1); + final List sleepDurations = new ArrayList<>(); - public List getSleepDurations() { - return sleepDurations; - } + // mock class methods: + // ScheduledFuture schedule(Runnable command, long delay, TimeUnit unit); + when(mock.schedule(any(Runnable.class), anyLong(), any(TimeUnit.class))) + .then( + new Answer>() { + @Override + public ScheduledFuture answer(InvocationOnMock invocation) throws Throwable { + Object[] args = invocation.getArguments(); + Runnable runnable = (Runnable) args[0]; + Long delay = (Long) args[1]; + TimeUnit unit = (TimeUnit) args[2]; + sleepDurations.add(new Duration(TimeUnit.MILLISECONDS.convert(delay, unit))); + clock.setCurrentNanoTime( + clock.nanoTime() + TimeUnit.NANOSECONDS.convert(delay, unit)); + return executor.schedule(runnable, 0, TimeUnit.NANOSECONDS); + } + }); + + // List shutdownNow() + when(mock.shutdownNow()) + .then( + new Answer>() { + @Override + public List answer(InvocationOnMock invocation) throws Throwable { + return executor.shutdownNow(); + } + }); + + // List getSleepDurations() + when(mock.getSleepDurations()).thenReturn(sleepDurations); - public FakeNanoClock getClock() { - return clock; + return mock; } } diff --git a/src/test/java/com/google/api/gax/grpc/UnaryCallableTest.java b/src/test/java/com/google/api/gax/grpc/UnaryCallableTest.java index 619f7002b..2b0dffe5d 100644 --- a/src/test/java/com/google/api/gax/grpc/UnaryCallableTest.java +++ b/src/test/java/com/google/api/gax/grpc/UnaryCallableTest.java @@ -92,7 +92,7 @@ static ApiFuture immediateFailedFuture(Throwable t) { @Before public void resetClock() { fakeClock = new FakeNanoClock(System.nanoTime()); - executor = new RecordingScheduler(fakeClock); + executor = RecordingScheduler.get(fakeClock); } @After @@ -124,7 +124,7 @@ public ApiFuture futureCall(RequestT request, CallContext context) { public void simpleCall() throws Exception { StashCallable stash = new StashCallable<>(1); - UnaryCallable callable = UnaryCallable.create(stash); + UnaryCallable callable = UnaryCallable.create(stash); Integer response = callable.call(2, CallContext.createDefault()); Truth.assertThat(response).isEqualTo(Integer.valueOf(1)); Truth.assertThat(stash.context.getChannel()).isNull(); @@ -137,7 +137,7 @@ public void simpleCall() throws Exception { public void bind() { Channel channel = Mockito.mock(Channel.class); StashCallable stash = new StashCallable<>(0); - UnaryCallable.create(stash).bind(channel).futureCall(0); + UnaryCallable.create(stash).bind(channel).futureCall(0); Truth.assertThat(stash.context.getChannel()).isSameAs(channel); } @@ -146,9 +146,9 @@ public void retryableBind() throws Exception { Channel channel = Mockito.mock(Channel.class); StashCallable stash = new StashCallable<>(0); - ImmutableSet retryable = ImmutableSet.of(Status.Code.UNAVAILABLE); + ImmutableSet retryable = ImmutableSet.of(Status.Code.UNAVAILABLE); UnaryCallable callable = - UnaryCallable.create(stash) + UnaryCallable.create(stash) .bind(channel) .retryableOn(retryable) .retrying(FAST_RETRY_SETTINGS, executor, fakeClock); @@ -162,10 +162,7 @@ public void pagedBind() { StashCallable> stash = new StashCallable>(new ArrayList()); - UnaryCallable.>create(stash) - .bind(channel) - .paged(new PagedFactory()) - .call(0); + UnaryCallable.create(stash).bind(channel).paged(new PagedFactory()).call(0); Truth.assertThat(stash.context.getChannel()).isSameAs(channel); } @@ -253,7 +250,7 @@ public void bundlingBind() throws Exception { @Test public void retry() { - ImmutableSet retryable = ImmutableSet.of(Status.Code.UNAVAILABLE); + ImmutableSet retryable = ImmutableSet.of(Status.Code.UNAVAILABLE); Throwable throwable = Status.UNAVAILABLE.asException(); Mockito.when(callInt.futureCall((Integer) Mockito.any(), (CallContext) Mockito.any())) .thenReturn(UnaryCallableTest.immediateFailedFuture(throwable)) @@ -261,15 +258,71 @@ public void retry() { .thenReturn(UnaryCallableTest.immediateFailedFuture(throwable)) .thenReturn(immediateFuture(2)); UnaryCallable callable = - UnaryCallable.create(callInt) + UnaryCallable.create(callInt) .retryableOn(retryable) .retrying(FAST_RETRY_SETTINGS, executor, fakeClock); Truth.assertThat(callable.call(1)).isEqualTo(2); } + @Test(expected = ApiException.class) + public void retryTotalTimeoutExceeded() { + ImmutableSet retryable = ImmutableSet.of(Status.Code.UNAVAILABLE); + Throwable throwable = Status.UNAVAILABLE.asException(); + Mockito.when(callInt.futureCall((Integer) Mockito.any(), (CallContext) Mockito.any())) + .thenReturn(UnaryCallableTest.immediateFailedFuture(throwable)) + .thenReturn(immediateFuture(2)); + + RetrySettings retrySettings = + FAST_RETRY_SETTINGS + .toBuilder() + .setInitialRetryDelay(Duration.millis(Integer.MAX_VALUE)) + .setMaxRetryDelay(Duration.millis(Integer.MAX_VALUE)) + .build(); + UnaryCallable callable = + UnaryCallable.create(callInt) + .retryableOn(retryable) + .retrying(retrySettings, executor, fakeClock); + callable.call(1); + } + + @Test(expected = ApiException.class) + public void retryMaxAttemptsExeeded() { + ImmutableSet retryable = ImmutableSet.of(Status.Code.UNAVAILABLE); + Throwable throwable = Status.UNAVAILABLE.asException(); + Mockito.when(callInt.futureCall((Integer) Mockito.any(), (CallContext) Mockito.any())) + .thenReturn(UnaryCallableTest.immediateFailedFuture(throwable)) + .thenReturn(UnaryCallableTest.immediateFailedFuture(throwable)) + .thenReturn(immediateFuture(2)); + + RetrySettings retrySettings = FAST_RETRY_SETTINGS.toBuilder().setMaxAttempts(2).build(); + UnaryCallable callable = + UnaryCallable.create(callInt) + .retryableOn(retryable) + .retrying(retrySettings, executor, fakeClock); + callable.call(1); + } + + @Test + public void retryWithinMaxAttempts() { + ImmutableSet retryable = ImmutableSet.of(Status.Code.UNAVAILABLE); + Throwable throwable = Status.UNAVAILABLE.asException(); + Mockito.when(callInt.futureCall((Integer) Mockito.any(), (CallContext) Mockito.any())) + .thenReturn(UnaryCallableTest.immediateFailedFuture(throwable)) + .thenReturn(UnaryCallableTest.immediateFailedFuture(throwable)) + .thenReturn(immediateFuture(2)); + + RetrySettings retrySettings = FAST_RETRY_SETTINGS.toBuilder().setMaxAttempts(3).build(); + UnaryCallable callable = + UnaryCallable.create(callInt) + .retryableOn(retryable) + .retrying(retrySettings, executor, fakeClock); + callable.call(1); + Truth.assertThat(callable.call(1)).isEqualTo(2); + } + @Test public void retryOnStatusUnknown() { - ImmutableSet retryable = ImmutableSet.of(Status.Code.UNKNOWN); + ImmutableSet retryable = ImmutableSet.of(Status.Code.UNKNOWN); Throwable throwable = Status.UNKNOWN.asException(); Mockito.when(callInt.futureCall((Integer) Mockito.any(), (CallContext) Mockito.any())) .thenReturn(UnaryCallableTest.immediateFailedFuture(throwable)) @@ -277,7 +330,7 @@ public void retryOnStatusUnknown() { .thenReturn(UnaryCallableTest.immediateFailedFuture(throwable)) .thenReturn(immediateFuture(2)); UnaryCallable callable = - UnaryCallable.create(callInt) + UnaryCallable.create(callInt) .retryableOn(retryable) .retrying(FAST_RETRY_SETTINGS, executor, fakeClock); Truth.assertThat(callable.call(1)).isEqualTo(2); @@ -287,12 +340,12 @@ public void retryOnStatusUnknown() { public void retryOnUnexpectedException() { thrown.expect(ApiException.class); thrown.expectMessage("foobar"); - ImmutableSet retryable = ImmutableSet.of(Status.Code.UNKNOWN); + ImmutableSet retryable = ImmutableSet.of(Status.Code.UNKNOWN); Throwable throwable = new RuntimeException("foobar"); Mockito.when(callInt.futureCall((Integer) Mockito.any(), (CallContext) Mockito.any())) .thenReturn(UnaryCallableTest.immediateFailedFuture(throwable)); UnaryCallable callable = - UnaryCallable.create(callInt) + UnaryCallable.create(callInt) .retryableOn(retryable) .retrying(FAST_RETRY_SETTINGS, executor, fakeClock); callable.call(1); @@ -302,14 +355,14 @@ public void retryOnUnexpectedException() { public void retryNoRecover() { thrown.expect(ApiException.class); thrown.expectMessage("foobar"); - ImmutableSet retryable = ImmutableSet.of(Status.Code.UNAVAILABLE); + ImmutableSet retryable = ImmutableSet.of(Status.Code.UNAVAILABLE); Mockito.when(callInt.futureCall((Integer) Mockito.any(), (CallContext) Mockito.any())) .thenReturn( UnaryCallableTest.immediateFailedFuture( Status.FAILED_PRECONDITION.withDescription("foobar").asException())) .thenReturn(immediateFuture(2)); UnaryCallable callable = - UnaryCallable.create(callInt) + UnaryCallable.create(callInt) .retryableOn(retryable) .retrying(FAST_RETRY_SETTINGS, executor, fakeClock); callable.call(1); @@ -319,13 +372,13 @@ public void retryNoRecover() { public void retryKeepFailing() { thrown.expect(UncheckedExecutionException.class); thrown.expectMessage("foobar"); - ImmutableSet retryable = ImmutableSet.of(Status.Code.UNAVAILABLE); + ImmutableSet retryable = ImmutableSet.of(Status.Code.UNAVAILABLE); Mockito.when(callInt.futureCall((Integer) Mockito.any(), (CallContext) Mockito.any())) .thenReturn( UnaryCallableTest.immediateFailedFuture( Status.UNAVAILABLE.withDescription("foobar").asException())); UnaryCallable callable = - UnaryCallable.create(callInt) + UnaryCallable.create(callInt) .retryableOn(retryable) .retrying(FAST_RETRY_SETTINGS, executor, fakeClock); // Need to advance time inside the call. @@ -336,7 +389,7 @@ public void retryKeepFailing() { @Test public void noSleepOnRetryTimeout() { ImmutableSet retryable = - ImmutableSet.of(Status.Code.UNAVAILABLE, Status.Code.DEADLINE_EXCEEDED); + ImmutableSet.of(Status.Code.UNAVAILABLE, Status.Code.DEADLINE_EXCEEDED); Mockito.when(callInt.futureCall((Integer) Mockito.any(), (CallContext) Mockito.any())) .thenReturn( UnaryCallableTest.immediateFailedFuture( @@ -344,7 +397,7 @@ public void noSleepOnRetryTimeout() { .thenReturn(immediateFuture(2)); UnaryCallable callable = - UnaryCallable.create(callInt) + UnaryCallable.create(callInt) .retryableOn(retryable) .retrying(FAST_RETRY_SETTINGS, executor, fakeClock); callable.call(1); @@ -421,7 +474,7 @@ public void paged() { .thenReturn(immediateFuture(Arrays.asList(3, 4))) .thenReturn(immediateFuture(Collections.emptyList())); Truth.assertThat( - UnaryCallable.>create(callIntList) + UnaryCallable.create(callIntList) .paged(new PagedFactory()) .call(0) .iterateAllElements()) @@ -436,10 +489,7 @@ public void pagedByPage() { .thenReturn(immediateFuture(Arrays.asList(3, 4))) .thenReturn(immediateFuture(Collections.emptyList())); Page, Integer> page = - UnaryCallable.>create(callIntList) - .paged(new PagedFactory()) - .call(0) - .getPage(); + UnaryCallable.create(callIntList).paged(new PagedFactory()).call(0).getPage(); Truth.assertThat(page).containsExactly(0, 1, 2).inOrder(); Truth.assertThat(page.getNextPage()).containsExactly(3, 4).inOrder(); @@ -453,7 +503,7 @@ public void pagedByFixedSizeCollection() { .thenReturn(immediateFuture(Arrays.asList(5, 6, 7))) .thenReturn(immediateFuture(Collections.emptyList())); FixedSizeCollection fixedSizeCollection = - UnaryCallable.>create(callIntList) + UnaryCallable.create(callIntList) .paged(new PagedFactory()) .call(0) .expandToFixedSizeCollection(5); @@ -469,7 +519,7 @@ public void pagedFixedSizeCollectionTooManyElements() { .thenReturn(immediateFuture(Arrays.asList(3, 4))) .thenReturn(immediateFuture(Collections.emptyList())); - UnaryCallable.>create(callIntList) + UnaryCallable.create(callIntList) .paged(new PagedFactory()) .call(0) .expandToFixedSizeCollection(4); @@ -481,7 +531,7 @@ public void pagedFixedSizeCollectionTooSmallCollectionSize() { .thenReturn(immediateFuture(Arrays.asList(0, 1))) .thenReturn(immediateFuture(Collections.emptyList())); - UnaryCallable.>create(callIntList) + UnaryCallable.create(callIntList) .paged(new PagedFactory()) .call(0) .expandToFixedSizeCollection(2); @@ -591,7 +641,7 @@ public void bundling() throws Exception { new BundlerFactory<>(SQUARER_BUNDLING_DESC, bundlingSettings); try { UnaryCallable> callable = - UnaryCallable.>create(callLabeledIntSquarer) + UnaryCallable.create(callLabeledIntSquarer) .bundling(SQUARER_BUNDLING_DESC, bundlerFactory); ApiFuture> f1 = callable.futureCall(new LabeledIntList("one", 1, 2)); ApiFuture> f2 = callable.futureCall(new LabeledIntList("one", 3, 4)); @@ -650,7 +700,7 @@ public void bundlingDisabled() throws Exception { new BundlerFactory<>(DISABLED_BUNDLING_DESC, bundlingSettings); try { UnaryCallable> callable = - UnaryCallable.>create(callLabeledIntSquarer) + UnaryCallable.create(callLabeledIntSquarer) .bundling(DISABLED_BUNDLING_DESC, bundlerFactory); ApiFuture> f1 = callable.futureCall(new LabeledIntList("one", 1, 2)); ApiFuture> f2 = callable.futureCall(new LabeledIntList("one", 3, 4)); @@ -671,7 +721,7 @@ public void bundlingWithBlockingCallThreshold() throws Exception { new BundlerFactory<>(SQUARER_BUNDLING_DESC, bundlingSettings); try { UnaryCallable> callable = - UnaryCallable.>create(callLabeledIntSquarer) + UnaryCallable.create(callLabeledIntSquarer) .bundling(SQUARER_BUNDLING_DESC, bundlerFactory); ApiFuture> f1 = callable.futureCall(new LabeledIntList("one", 1)); ApiFuture> f2 = callable.futureCall(new LabeledIntList("one", 3)); @@ -686,8 +736,7 @@ public void bundlingWithBlockingCallThreshold() throws Exception { new FutureCallable>() { @Override public ApiFuture> futureCall(LabeledIntList request, CallContext context) { - return UnaryCallableTest.>immediateFailedFuture( - new IllegalArgumentException("I FAIL!!")); + return UnaryCallableTest.immediateFailedFuture(new IllegalArgumentException("I FAIL!!")); } }; @@ -702,7 +751,7 @@ public void bundlingException() throws Exception { new BundlerFactory<>(SQUARER_BUNDLING_DESC, bundlingSettings); try { UnaryCallable> callable = - UnaryCallable.>create(callLabeledIntExceptionThrower) + UnaryCallable.create(callLabeledIntExceptionThrower) .bundling(SQUARER_BUNDLING_DESC, bundlerFactory); ApiFuture> f1 = callable.futureCall(new LabeledIntList("one", 1, 2)); ApiFuture> f2 = callable.futureCall(new LabeledIntList("one", 3, 4)); @@ -728,13 +777,12 @@ public void bundlingException() throws Exception { @Test public void testKnownStatusCode() { - ImmutableSet retryable = ImmutableSet.of(Status.Code.UNAVAILABLE); + ImmutableSet retryable = ImmutableSet.of(Status.Code.UNAVAILABLE); Mockito.when(callInt.futureCall((Integer) Mockito.any(), (CallContext) Mockito.any())) .thenReturn( UnaryCallableTest.immediateFailedFuture( Status.FAILED_PRECONDITION.withDescription("known").asException())); - UnaryCallable callable = - UnaryCallable.create(callInt).retryableOn(retryable); + UnaryCallable callable = UnaryCallable.create(callInt).retryableOn(retryable); try { callable.call(1); } catch (ApiException exception) { @@ -746,12 +794,11 @@ public void testKnownStatusCode() { @Test public void testUnknownStatusCode() { - ImmutableSet retryable = ImmutableSet.of(); + ImmutableSet retryable = ImmutableSet.of(); Mockito.when(callInt.futureCall((Integer) Mockito.any(), (CallContext) Mockito.any())) .thenReturn( UnaryCallableTest.immediateFailedFuture(new RuntimeException("unknown"))); - UnaryCallable callable = - UnaryCallable.create(callInt).retryableOn(retryable); + UnaryCallable callable = UnaryCallable.create(callInt).retryableOn(retryable); try { callable.call(1); } catch (ApiException exception) { diff --git a/src/test/java/com/google/api/gax/retrying/AbstractRetryHandlerTest.java b/src/test/java/com/google/api/gax/retrying/AbstractRetryHandlerTest.java new file mode 100644 index 000000000..85bf40d65 --- /dev/null +++ b/src/test/java/com/google/api/gax/retrying/AbstractRetryHandlerTest.java @@ -0,0 +1,140 @@ +/* + * Copyright 2017, Google Inc. All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ +package com.google.api.gax.retrying; + +import static com.google.api.gax.retrying.FailingCallable.FAST_RETRY_SETTINGS; +import static junit.framework.TestCase.assertFalse; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import com.google.api.gax.core.RetrySettings; +import com.google.api.gax.retrying.FailingCallable.CustomException; +import java.util.concurrent.CancellationException; +import java.util.concurrent.ExecutionException; +import org.joda.time.Duration; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +@RunWith(JUnit4.class) +public abstract class AbstractRetryHandlerTest { + + protected abstract RetryHandler getRetryHandler(); + + @Test + public void testNoFailures() throws ExecutionException, InterruptedException { + RetryHandler handler = getRetryHandler(); + FailingCallable callable = new FailingCallable(0, "SUCCESS"); + RetryFuture future = handler.createFirstAttempt(callable, FAST_RETRY_SETTINGS); + future.setAttemptFuture(handler.executeAttempt(callable, future.getAttemptSettings())); + assertEquals("SUCCESS", future.get()); + assertTrue(future.isDone()); + assertFalse(future.isCancelled()); + assertEquals(0, future.getAttemptSettings().getAttemptCount()); + } + + @Test + public void testSuccessWithFailures() throws ExecutionException, InterruptedException { + RetryHandler handler = getRetryHandler(); + FailingCallable callable = new FailingCallable(5, "SUCCESS"); + RetryFuture future = handler.createFirstAttempt(callable, FAST_RETRY_SETTINGS); + future.setAttemptFuture(handler.executeAttempt(callable, future.getAttemptSettings())); + assertEquals("SUCCESS", future.get()); + assertTrue(future.isDone()); + assertFalse(future.isCancelled()); + assertEquals(5, future.getAttemptSettings().getAttemptCount()); + } + + @Test + public void testMaxRetriesExcceeded() { + RetryHandler handler = getRetryHandler(); + FailingCallable callable = new FailingCallable(6, "FAILURE"); + RetryFuture future = handler.createFirstAttempt(callable, FAST_RETRY_SETTINGS); + future.setAttemptFuture(handler.executeAttempt(callable, future.getAttemptSettings())); + + CustomException exception = null; + try { + future.get(); + } catch (Exception e) { + exception = (CustomException) e.getCause(); + } + assertEquals(CustomException.class, exception.getClass()); + + assertEquals(5, future.getAttemptSettings().getAttemptCount()); + assertTrue(future.isDone()); + assertFalse(future.isCancelled()); + } + + @Test + public void testTotalTimeoutExcceeded() throws Exception { + RetryHandler handler = getRetryHandler(); + FailingCallable callable = new FailingCallable(6, "FAILURE"); + RetrySettings retrySettings = + FAST_RETRY_SETTINGS + .toBuilder() + .setInitialRetryDelay(Duration.millis(Integer.MAX_VALUE)) + .setMaxRetryDelay(Duration.millis(Integer.MAX_VALUE)) + .build(); + RetryFuture future = handler.createFirstAttempt(callable, retrySettings); + future.setAttemptFuture(handler.executeAttempt(callable, future.getAttemptSettings())); + + CustomException exception = null; + try { + future.get(); + } catch (Exception e) { + exception = (CustomException) e.getCause(); + } + assertEquals(CustomException.class, exception.getClass()); + assertTrue(future.getAttemptSettings().getAttemptCount() < 4); + assertTrue(future.isDone()); + assertFalse(future.isCancelled()); + } + + @Test(expected = CancellationException.class) + public void testCancelOuterFuture() throws ExecutionException, InterruptedException { + RetryHandler handler = getRetryHandler(); + FailingCallable callable = new FailingCallable(4, "SUCCESS"); + RetrySettings retrySettings = + FAST_RETRY_SETTINGS + .toBuilder() + .setInitialRetryDelay(Duration.millis(1_000L)) + .setMaxRetryDelay(Duration.millis(1_000L)) + .setTotalTimeout(Duration.millis(10_0000L)) + .build(); + + RetryFuture future = handler.createFirstAttempt(callable, retrySettings); + future.cancel(false); + future.setAttemptFuture(handler.executeAttempt(callable, future.getAttemptSettings())); + assertTrue(future.isDone()); + assertTrue(future.isCancelled()); + assertTrue(future.getAttemptSettings().getAttemptCount() < 4); + future.get(); + } +} diff --git a/src/main/java/com/google/api/gax/grpc/DefaultNanoClock.java b/src/test/java/com/google/api/gax/retrying/DirectRetryHandlerTest.java similarity index 78% rename from src/main/java/com/google/api/gax/grpc/DefaultNanoClock.java rename to src/test/java/com/google/api/gax/retrying/DirectRetryHandlerTest.java index d0d308333..9d6070529 100644 --- a/src/main/java/com/google/api/gax/grpc/DefaultNanoClock.java +++ b/src/test/java/com/google/api/gax/retrying/DirectRetryHandlerTest.java @@ -1,5 +1,5 @@ /* - * Copyright 2016, Google Inc. All rights reserved. + * Copyright 2017, Google Inc. All rights reserved. * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions are @@ -27,20 +27,17 @@ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. */ -package com.google.api.gax.grpc; +package com.google.api.gax.retrying; -/** - * Default implementation of the NanoClock interface, using call to System.nanoTime(). - */ -public final class DefaultNanoClock implements NanoClock { - public static NanoClock create() { - return new DefaultNanoClock(); - } +import com.google.api.gax.core.DefaultNanoClock; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; - private DefaultNanoClock() {} +@RunWith(JUnit4.class) +public class DirectRetryHandlerTest extends AbstractRetryHandlerTest { @Override - public final long nanoTime() { - return System.nanoTime(); + protected RetryHandler getRetryHandler() { + return new DirectRetryHandler<>(DefaultNanoClock.getDefaultClock()); } } diff --git a/src/test/java/com/google/api/gax/retrying/FailingCallable.java b/src/test/java/com/google/api/gax/retrying/FailingCallable.java new file mode 100644 index 000000000..3b2f0aeac --- /dev/null +++ b/src/test/java/com/google/api/gax/retrying/FailingCallable.java @@ -0,0 +1,72 @@ +/* + * Copyright 2017, Google Inc. All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ +package com.google.api.gax.retrying; + +import com.google.api.gax.core.RetrySettings; +import java.util.concurrent.Callable; +import java.util.concurrent.atomic.AtomicInteger; +import org.joda.time.Duration; + +class FailingCallable implements Callable { + protected static final RetrySettings FAST_RETRY_SETTINGS = + RetrySettings.newBuilder() + .setMaxAttempts(6) + .setInitialRetryDelay(Duration.millis(2L)) + .setRetryDelayMultiplier(1) + .setMaxRetryDelay(Duration.millis(2L)) + .setInitialRpcTimeout(Duration.millis(2L)) + .setRpcTimeoutMultiplier(1) + .setMaxRpcTimeout(Duration.millis(2L)) + .setTotalTimeout(Duration.millis(10L)) + .build(); + + private AtomicInteger attemptsCount = new AtomicInteger(0); + private final int expectedFailuresCount; + private final String result; + + protected FailingCallable(int expectedFailuresCount, String result) { + this.expectedFailuresCount = expectedFailuresCount; + this.result = result; + } + + @Override + public String call() throws Exception { + if (attemptsCount.getAndIncrement() < expectedFailuresCount) { + System.out.println("Throw: " + attemptsCount.get()); + throw new CustomException(); + } + return result; + } + + protected static class CustomException extends RuntimeException { + + private static final long serialVersionUID = -1543459008653697004L; + } +} diff --git a/src/test/java/com/google/api/gax/retrying/ScheduledRetryHandlerTest.java b/src/test/java/com/google/api/gax/retrying/ScheduledRetryHandlerTest.java new file mode 100644 index 000000000..fc30861bb --- /dev/null +++ b/src/test/java/com/google/api/gax/retrying/ScheduledRetryHandlerTest.java @@ -0,0 +1,118 @@ +/* + * Copyright 2017, Google Inc. All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ +package com.google.api.gax.retrying; + +import static com.google.api.gax.retrying.FailingCallable.FAST_RETRY_SETTINGS; +import static org.junit.Assert.assertTrue; + +import com.google.api.gax.core.DefaultNanoClock; +import com.google.api.gax.core.RetrySettings; +import java.util.concurrent.CancellationException; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import org.joda.time.Duration; +import org.junit.After; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +@RunWith(JUnit4.class) +public class ScheduledRetryHandlerTest extends AbstractRetryHandlerTest { + + private ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor(); + private ScheduledRetryHandler retryHandler = + new ScheduledRetryHandler<>(DefaultNanoClock.getDefaultClock(), executorService); + + @After + public void after() { + executorService.shutdownNow(); + } + + @Override + protected RetryHandler getRetryHandler() { + return retryHandler; + } + + @Test(expected = CancellationException.class) + public void testCancelOuterFutureAfterStart() throws ExecutionException, InterruptedException { + RetryHandler handler = getRetryHandler(); + FailingCallable callable = new FailingCallable(4, "SUCCESS"); + RetrySettings retrySettings = + FAST_RETRY_SETTINGS + .toBuilder() + .setInitialRetryDelay(Duration.millis(1_000L)) + .setMaxRetryDelay(Duration.millis(1_000L)) + .setTotalTimeout(Duration.millis(10_0000L)) + .build(); + + RetryFuture future = handler.createFirstAttempt(callable, retrySettings); + future.setAttemptFuture(handler.executeAttempt(callable, future.getAttemptSettings())); + future.cancel(false); + assertTrue(future.isDone()); + assertTrue(future.isCancelled()); + assertTrue(future.getAttemptSettings().getAttemptCount() < 4); + future.get(); + } + + @Test(expected = CancellationException.class) + public void testCancelProxiedFutureAfterStart() throws ExecutionException, InterruptedException { + RetryHandler handler = getRetryHandler(); + FailingCallable callable = new FailingCallable(5, "SUCCESS"); + RetrySettings retrySettings = + FAST_RETRY_SETTINGS + .toBuilder() + .setInitialRetryDelay(Duration.millis(1_000L)) + .setMaxRetryDelay(Duration.millis(1_000L)) + .setTotalTimeout(Duration.millis(10_0000L)) + .build(); + + RetryFuture future = handler.createFirstAttempt(callable, retrySettings); + future.setAttemptFuture(handler.executeAttempt(callable, future.getAttemptSettings())); + Thread.sleep(50L); + + //Note that shutdownNow() will not cancel internal FutureTasks automatically, which + //may potentially cause another thread handing on RetryFuture#get() call forever. + //Canceling the tasks returned by shutdownNow() also does not help, because of missing feature + //in guava's ListenableScheduledFuture, which does not cancel itself, when its delegate is canceled. + //So only the graceful shutdown() is supported properly. + executorService.shutdown(); + + try { + future.get(); + } catch (CancellationException e) { + //Used to wait for cancellation to propagate, so isDone() is guaranteed to return true. + } + assertTrue(future.isDone()); + assertTrue(future.isCancelled()); + assertTrue(future.getAttemptSettings().getAttemptCount() < 4); + future.get(); + } +} diff --git a/version.txt b/version.txt index 66ec8b17f..cf2499627 100644 --- a/version.txt +++ b/version.txt @@ -1 +1 @@ -0.3.1-SNAPSHOT +0.3.2-SNAPSHOT