diff --git a/benchmark/build.gradle b/benchmark/build.gradle index 5b1bfac0f..8f0426ef0 100644 --- a/benchmark/build.gradle +++ b/benchmark/build.gradle @@ -1,6 +1,5 @@ buildscript { repositories { - jcenter() maven { url "https://plugins.gradle.org/m2/" } diff --git a/gax/src/main/java/com/google/api/gax/rpc/AttemptCallable.java b/gax/src/main/java/com/google/api/gax/rpc/AttemptCallable.java index 37fd65aa1..b0ac657e5 100644 --- a/gax/src/main/java/com/google/api/gax/rpc/AttemptCallable.java +++ b/gax/src/main/java/com/google/api/gax/rpc/AttemptCallable.java @@ -33,6 +33,7 @@ import com.google.api.core.ApiFutures; import com.google.api.gax.retrying.NonCancellableFuture; import com.google.api.gax.retrying.RetryingFuture; +import com.google.common.base.Preconditions; import java.util.concurrent.Callable; import org.threeten.bp.Duration; @@ -54,13 +55,13 @@ class AttemptCallable implements Callable { AttemptCallable( UnaryCallable callable, RequestT request, ApiCallContext callContext) { - this.callable = callable; - this.request = request; - this.originalCallContext = callContext; + this.callable = Preconditions.checkNotNull(callable); + this.request = Preconditions.checkNotNull(request); + this.originalCallContext = Preconditions.checkNotNull(callContext); } public void setExternalFuture(RetryingFuture externalFuture) { - this.externalFuture = externalFuture; + this.externalFuture = Preconditions.checkNotNull(externalFuture); } @Override @@ -68,12 +69,11 @@ public ResponseT call() { ApiCallContext callContext = originalCallContext; try { - if (callContext != null) { - Duration rpcTimeout = externalFuture.getAttemptSettings().getRpcTimeout(); - if (!rpcTimeout.isZero()) { - callContext = callContext.withTimeout(rpcTimeout); - } + Duration rpcTimeout = externalFuture.getAttemptSettings().getRpcTimeout(); + if (!rpcTimeout.isZero()) { + callContext = callContext.withTimeout(rpcTimeout); } + externalFuture.setAttemptFuture(new NonCancellableFuture()); if (externalFuture.isDone()) { return null; diff --git a/gax/src/main/java/com/google/api/gax/rpc/CheckingAttemptCallable.java b/gax/src/main/java/com/google/api/gax/rpc/CheckingAttemptCallable.java index 06526c2db..85c2976fa 100644 --- a/gax/src/main/java/com/google/api/gax/rpc/CheckingAttemptCallable.java +++ b/gax/src/main/java/com/google/api/gax/rpc/CheckingAttemptCallable.java @@ -33,7 +33,9 @@ import com.google.api.core.ApiFutures; import com.google.api.gax.retrying.NonCancellableFuture; import com.google.api.gax.retrying.RetryingFuture; +import com.google.common.base.Preconditions; import java.util.concurrent.Callable; +import org.threeten.bp.Duration; /** * A callable representing an attempt to check the status of something by issuing a call to a @@ -46,25 +48,37 @@ */ class CheckingAttemptCallable implements Callable { private final UnaryCallable callable; + private final ApiCallContext originalCallContext; private volatile RetryingFuture externalFuture; - CheckingAttemptCallable(UnaryCallable callable) { - this.callable = callable; + CheckingAttemptCallable(UnaryCallable callable, ApiCallContext callContext) { + this.callable = Preconditions.checkNotNull(callable); + this.originalCallContext = Preconditions.checkNotNull(callContext); } public void setExternalFuture(RetryingFuture externalFuture) { - this.externalFuture = externalFuture; + this.externalFuture = Preconditions.checkNotNull(externalFuture); } @Override public ResponseT call() { + ApiCallContext callContext = originalCallContext; + try { + Duration rpcTimeout = externalFuture.getAttemptSettings().getRpcTimeout(); + if (!rpcTimeout.isZero()) { + callContext = callContext.withTimeout(rpcTimeout); + } + externalFuture.setAttemptFuture(new NonCancellableFuture()); if (externalFuture.isDone()) { return null; } - ApiFuture internalFuture = callable.futureCall(null, null); + // NOTE: The callable here is an OperationCheckingCallable, which will compose its own + // request using a resolved operation name and ignore anything that we pass here for the + // request. + ApiFuture internalFuture = callable.futureCall(null, callContext); externalFuture.setAttemptFuture(internalFuture); } catch (Throwable e) { externalFuture.setAttemptFuture(ApiFutures.immediateFailedFuture(e)); diff --git a/gax/src/main/java/com/google/api/gax/rpc/OperationCallableImpl.java b/gax/src/main/java/com/google/api/gax/rpc/OperationCallableImpl.java index 2985052a7..aa9eee9be 100644 --- a/gax/src/main/java/com/google/api/gax/rpc/OperationCallableImpl.java +++ b/gax/src/main/java/com/google/api/gax/rpc/OperationCallableImpl.java @@ -73,21 +73,27 @@ class OperationCallableImpl * time. * * @param request The request to initiate the operation. - * @param context {@link ApiCallContext} to make the call with + * @param callContext {@link ApiCallContext} to make the call with * @return {@link OperationFuture} for the call result */ @Override public OperationFuture futureCall( - RequestT request, ApiCallContext context) { - return futureCall(initialCallable.futureCall(request, context)); + RequestT request, ApiCallContext callContext) { + ApiFuture initialFuture = initialCallable.futureCall(request, callContext); + return futureCall(initialFuture, callContext); } - OperationFutureImpl futureCall(ApiFuture initialFuture) { + /** Waits for the initialFuture to resolve and then starts to poll the return operation. */ + OperationFutureImpl futureCall( + ApiFuture initialFuture, ApiCallContext callContext) { + RecheckingCallable callable = new RecheckingCallable<>( new OperationCheckingCallable(longRunningClient, initialFuture), executor); - RetryingFuture pollingFuture = callable.futureCall(null, null); + // NOTE: OperationCheckingCallable will compose its own request using the resolved + // initialFuture. So the request parameter to futureCall is ignored + RetryingFuture pollingFuture = callable.futureCall(null, callContext); return new OperationFutureImpl<>( pollingFuture, initialFuture, responseTransformer, metadataTransformer); } @@ -98,24 +104,26 @@ OperationFutureImpl futureCall(ApiFuture resumeFutureCall( - String operationName, ApiCallContext context) { - return futureCall(longRunningClient.getOperationCallable().futureCall(operationName, context)); + String operationName, ApiCallContext callContext) { + ApiFuture firstAttempt = + longRunningClient.getOperationCallable().futureCall(operationName, callContext); + return futureCall(firstAttempt, callContext); } /** * Sends a cancellation request to the server for the operation with name {@code operationName}. * * @param operationName The name of the operation to cancel. - * @param context {@link ApiCallContext} to make the call with + * @param callContext {@link ApiCallContext} to make the call with * @return the future which completes once the operation is canceled on the server side. */ @Override - public ApiFuture cancel(String operationName, ApiCallContext context) { - return longRunningClient.cancelOperationCallable().futureCall(operationName, context); + public ApiFuture cancel(String operationName, ApiCallContext callContext) { + return longRunningClient.cancelOperationCallable().futureCall(operationName, callContext); } } diff --git a/gax/src/main/java/com/google/api/gax/rpc/OperationCheckingCallable.java b/gax/src/main/java/com/google/api/gax/rpc/OperationCheckingCallable.java index 71c7773eb..302e0fbae 100644 --- a/gax/src/main/java/com/google/api/gax/rpc/OperationCheckingCallable.java +++ b/gax/src/main/java/com/google/api/gax/rpc/OperationCheckingCallable.java @@ -59,11 +59,12 @@ class OperationCheckingCallable extends UnaryCallable futureCall(RequestT request, ApiCallContext context) { + public ApiFuture futureCall(RequestT ignored, ApiCallContext callContext) { try { if (!initialFuture.isDone() || initialFuture.isCancelled()) { return initialFuture; @@ -71,12 +72,13 @@ public ApiFuture futureCall(RequestT request, ApiCallContext // Since initialFuture is done at this point, the following call should be non-blocking OperationSnapshot initialOperation = initialFuture.get(); - // Note Future.isDone() and Operation.getDone() are two fundamentally different things. if (initialOperation.isDone()) { return initialFuture; } - return longRunningClient.getOperationCallable().futureCall(initialOperation.getName(), null); + return longRunningClient + .getOperationCallable() + .futureCall(initialOperation.getName(), callContext); } catch (ExecutionException e) { return ApiFutures.immediateFailedFuture(e.getCause()); } catch (InterruptedException e) { diff --git a/gax/src/main/java/com/google/api/gax/rpc/RecheckingCallable.java b/gax/src/main/java/com/google/api/gax/rpc/RecheckingCallable.java index 7950aeb2c..5f75323fb 100644 --- a/gax/src/main/java/com/google/api/gax/rpc/RecheckingCallable.java +++ b/gax/src/main/java/com/google/api/gax/rpc/RecheckingCallable.java @@ -37,7 +37,7 @@ * A UnaryCallable that will keep issuing calls to an inner callable until a terminal condition is * met. * - *

Note: Any request or context passed to this class is ignored. + *

Note: Any request passed to this class is ignored. * *

Package-private for internal use. */ @@ -52,9 +52,9 @@ class RecheckingCallable extends UnaryCallable futureCall(RequestT request, ApiCallContext inputContext) { + public RetryingFuture futureCall(RequestT ignored, ApiCallContext inputContext) { CheckingAttemptCallable checkingAttemptCallable = - new CheckingAttemptCallable<>(callable); + new CheckingAttemptCallable<>(callable, inputContext); RetryingFuture retryingFuture = executor.createFuture(checkingAttemptCallable); checkingAttemptCallable.setExternalFuture(retryingFuture); diff --git a/gax/src/test/java/com/google/api/gax/rpc/CheckingAttemptCallableTest.java b/gax/src/test/java/com/google/api/gax/rpc/CheckingAttemptCallableTest.java new file mode 100644 index 000000000..650f1590f --- /dev/null +++ b/gax/src/test/java/com/google/api/gax/rpc/CheckingAttemptCallableTest.java @@ -0,0 +1,104 @@ +/* + * Copyright 2016 Google LLC + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google LLC nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ +package com.google.api.gax.rpc; + +import static com.google.common.truth.Truth.assertThat; + +import com.google.api.core.SettableApiFuture; +import com.google.api.gax.retrying.RetrySettings; +import com.google.api.gax.retrying.RetryingFuture; +import com.google.api.gax.retrying.TimedAttemptSettings; +import com.google.api.gax.rpc.testing.FakeCallContext; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.ArgumentCaptor; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.junit.MockitoJUnitRunner; +import org.mockito.stubbing.Answer; +import org.threeten.bp.Duration; + +@RunWith(MockitoJUnitRunner.class) +public class CheckingAttemptCallableTest { + @Mock UnaryCallable mockInnerCallable; + ArgumentCaptor capturedCallContext; + @Mock RetryingFuture mockExternalFuture; + TimedAttemptSettings currentAttemptSettings; + + @Before + public void setUp() { + capturedCallContext = ArgumentCaptor.forClass(ApiCallContext.class); + Mockito.when(mockInnerCallable.futureCall(Mockito.any(), capturedCallContext.capture())) + .thenReturn(SettableApiFuture.create()); + + currentAttemptSettings = + TimedAttemptSettings.newBuilder() + .setGlobalSettings(RetrySettings.newBuilder().build()) + .setAttemptCount(0) + .setFirstAttemptStartTimeNanos(0) + .setRetryDelay(Duration.ofSeconds(1)) + .setRandomizedRetryDelay(Duration.ofSeconds(1)) + .setRpcTimeout(Duration.ZERO) + .build(); + + Mockito.when(mockExternalFuture.getAttemptSettings()) + .thenAnswer( + new Answer() { + @Override + public TimedAttemptSettings answer(InvocationOnMock invocation) throws Throwable { + return currentAttemptSettings; + } + }); + } + + @Test + public void testRpcTimeout() { + CheckingAttemptCallable callable = + new CheckingAttemptCallable<>(mockInnerCallable, FakeCallContext.createDefault()); + callable.setExternalFuture(mockExternalFuture); + + // Make sure that the rpc timeout is set + Duration timeout = Duration.ofSeconds(10); + currentAttemptSettings = currentAttemptSettings.toBuilder().setRpcTimeout(timeout).build(); + + callable.call(); + + assertThat(capturedCallContext.getValue().getTimeout()).isEqualTo(timeout); + + // Make sure that subsequent attempts can extend the time out + Duration longerTimeout = Duration.ofSeconds(20); + currentAttemptSettings = + currentAttemptSettings.toBuilder().setRpcTimeout(longerTimeout).build(); + callable.call(); + assertThat(capturedCallContext.getValue().getTimeout()).isEqualTo(longerTimeout); + } +} diff --git a/gax/src/test/java/com/google/api/gax/rpc/OperationCallableImplTest.java b/gax/src/test/java/com/google/api/gax/rpc/OperationCallableImplTest.java index 781e39362..b4121ba1e 100644 --- a/gax/src/test/java/com/google/api/gax/rpc/OperationCallableImplTest.java +++ b/gax/src/test/java/com/google/api/gax/rpc/OperationCallableImplTest.java @@ -52,11 +52,13 @@ import com.google.api.gax.rpc.testing.FakeStatusCode; import com.google.api.gax.rpc.testing.FakeTransportChannel; import com.google.auth.Credentials; +import com.google.common.collect.Lists; import com.google.common.truth.Truth; import com.google.common.util.concurrent.Futures; import java.awt.Color; import java.io.IOException; import java.util.Currency; +import java.util.List; import java.util.concurrent.CancellationException; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; @@ -68,6 +70,7 @@ import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; +import org.mockito.ArgumentCaptor; import org.mockito.Mockito; import org.threeten.bp.Duration; @@ -374,7 +377,8 @@ public void testFutureCallInitialCancel() throws Exception { OperationFutureImpl future = callableImpl.futureCall( new ListenableFutureToApiFuture<>( - Futures.immediateCancelledFuture())); + Futures.immediateCancelledFuture()), + FakeCallContext.createDefault()); Exception exception = null; try { @@ -408,10 +412,9 @@ public void testFutureCallInitialOperationUnexpectedFail() throws Exception { RuntimeException thrownException = new RuntimeException(); + ApiFuture initialFuture = ApiFutures.immediateFailedFuture(thrownException); OperationFuture future = - callableImpl.futureCall( - new ListenableFutureToApiFuture<>( - Futures.immediateFailedFuture(thrownException))); + callableImpl.futureCall(initialFuture, FakeCallContext.createDefault()); assertFutureFailMetaFail(future, RuntimeException.class, null); assertThat(executor.getIterationsCount()).isEqualTo(0); @@ -464,6 +467,94 @@ public void testFutureCallPollDoneOnSecond() throws Exception { assertThat(executor.getIterationsCount()).isEqualTo(1); } + @Test + public void testFutureCallPollRPCTimeout() throws Exception { + String opName = "testFutureCallPollRPCTimeout"; + pollingAlgorithm = + OperationTimedPollAlgorithm.create( + FAST_RECHECKING_SETTINGS + .toBuilder() + .setInitialRpcTimeout(Duration.ofMillis(100)) + .setMaxRpcTimeout(Duration.ofSeconds(1)) + .setRpcTimeoutMultiplier(2) + .build(), + clock); + callSettings = callSettings.toBuilder().setPollingAlgorithm(pollingAlgorithm).build(); + + Color resp = getColor(0.5f); + Currency meta1 = Currency.getInstance("UAH"); + Currency meta2 = Currency.getInstance("USD"); + OperationSnapshot initialOperation = getOperation(opName, null, null, null, false); + OperationSnapshot resultOperation1 = getOperation(opName, null, null, meta1, false); + OperationSnapshot resultOperation2 = getOperation(opName, null, null, meta1, false); + OperationSnapshot resultOperation3 = getOperation(opName, resp, null, meta2, true); + + UnaryCallable initialCallable = + mockGetOpSnapshotCallable(StatusCode.Code.OK, initialOperation); + + LongRunningClient longRunningClient = Mockito.mock(LongRunningClient.class); + @SuppressWarnings("unchecked") + UnaryCallable getOpCallable = Mockito.mock(UnaryCallable.class); + ArgumentCaptor callContextCaptor = + ArgumentCaptor.forClass(ApiCallContext.class); + Mockito.when(longRunningClient.getOperationCallable()).thenReturn(getOpCallable); + + Mockito.when(getOpCallable.futureCall(Mockito.any(), callContextCaptor.capture())) + .thenReturn(ApiFutures.immediateFuture(resultOperation1)) + .thenReturn(ApiFutures.immediateFuture(resultOperation2)) + .thenReturn(ApiFutures.immediateFuture(resultOperation3)); + + OperationCallable callable = + FakeCallableFactory.createOperationCallable( + initialCallable, callSettings, initialContext, longRunningClient); + + callable.futureCall(2, FakeCallContext.createDefault()).get(10, TimeUnit.SECONDS); + + List actualTimeouts = Lists.newArrayList(); + + for (ApiCallContext callContext : callContextCaptor.getAllValues()) { + actualTimeouts.add(callContext.getTimeout()); + } + assertThat(actualTimeouts).containsAllOf(Duration.ofMillis(100), Duration.ofMillis(200)); + } + + @Test + public void testFutureCallContextPropagation() throws Exception { + // Note: there is a bug in Rechecking callable that will return the initial RPC timeouts + // twice. So, this test works around the issue by polling 3 times and checking for the first + // 2 timeout durations + String opName = "testFutureCallContextPropagation"; + + Color resp = getColor(0.5f); + Currency meta1 = Currency.getInstance("UAH"); + Currency meta2 = Currency.getInstance("USD"); + OperationSnapshot initialOperation = getOperation(opName, null, null, null, false); + OperationSnapshot resultOperation = getOperation(opName, resp, null, meta2, true); + + UnaryCallable initialCallable = + mockGetOpSnapshotCallable(StatusCode.Code.OK, initialOperation); + + LongRunningClient longRunningClient = Mockito.mock(LongRunningClient.class); + @SuppressWarnings("unchecked") + UnaryCallable getOpCallable = Mockito.mock(UnaryCallable.class); + ArgumentCaptor callContextCaptor = + ArgumentCaptor.forClass(ApiCallContext.class); + Mockito.when(longRunningClient.getOperationCallable()).thenReturn(getOpCallable); + + Mockito.when(getOpCallable.futureCall(Mockito.any(), callContextCaptor.capture())) + .thenReturn(ApiFutures.immediateFuture(resultOperation)); + + OperationCallable callable = + FakeCallableFactory.createOperationCallable( + initialCallable, callSettings, initialContext, longRunningClient); + + ApiCallContext callContext = FakeCallContext.createDefault().withTimeout(Duration.ofMillis(10)); + + callable.futureCall(2, callContext).get(10, TimeUnit.SECONDS); + + assertThat(callContextCaptor.getValue().getTimeout()).isEqualTo(Duration.ofMillis(10)); + } + @Test public void testFutureCallPollDoneOnMany() throws Exception { final int iterationsCount = 1000;