Skip to content
This repository has been archived by the owner on Sep 26, 2023. It is now read-only.

Fix LRO callables so that the ApiCallContext is always passed through. #600

Merged
16 changes: 10 additions & 6 deletions gax/src/main/java/com/google/api/gax/rpc/AttemptCallable.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,9 @@
import com.google.api.core.ApiFutures;
import com.google.api.gax.retrying.NonCancellableFuture;
import com.google.api.gax.retrying.RetryingFuture;
import com.google.common.base.Preconditions;
import java.util.concurrent.Callable;
import org.threeten.bp.Duration;

/**
* A callable representing an attempt to make an RPC call. This class is used from {@link
Expand All @@ -53,23 +55,25 @@ class AttemptCallable<RequestT, ResponseT> implements Callable<ResponseT> {

AttemptCallable(
UnaryCallable<RequestT, ResponseT> callable, RequestT request, ApiCallContext callContext) {
this.callable = callable;
this.request = request;
this.originalCallContext = callContext;
this.callable = Preconditions.checkNotNull(callable);
this.request = Preconditions.checkNotNull(request);
this.originalCallContext = Preconditions.checkNotNull(callContext);
}

public void setExternalFuture(RetryingFuture<ResponseT> externalFuture) {
this.externalFuture = externalFuture;
this.externalFuture = Preconditions.checkNotNull(externalFuture);
}

@Override
public ResponseT call() {
ApiCallContext callContext = originalCallContext;

try {
if (callContext != null) {
callContext = callContext.withTimeout(externalFuture.getAttemptSettings().getRpcTimeout());
Duration rpcTimeout = externalFuture.getAttemptSettings().getRpcTimeout();
if (!rpcTimeout.isZero()) {
callContext = callContext.withTimeout(rpcTimeout);
}

externalFuture.setAttemptFuture(new NonCancellableFuture<ResponseT>());
if (externalFuture.isDone()) {
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,9 @@
import com.google.api.core.ApiFutures;
import com.google.api.gax.retrying.NonCancellableFuture;
import com.google.api.gax.retrying.RetryingFuture;
import com.google.common.base.Preconditions;
import java.util.concurrent.Callable;
import org.threeten.bp.Duration;

/**
* A callable representing an attempt to check the status of something by issuing a call to a
Expand All @@ -46,25 +48,37 @@
*/
class CheckingAttemptCallable<RequestT, ResponseT> implements Callable<ResponseT> {
private final UnaryCallable<RequestT, ResponseT> callable;
private final ApiCallContext originalCallContext;

private volatile RetryingFuture<ResponseT> externalFuture;

CheckingAttemptCallable(UnaryCallable<RequestT, ResponseT> callable) {
this.callable = callable;
CheckingAttemptCallable(UnaryCallable<RequestT, ResponseT> callable, ApiCallContext callContext) {
this.callable = Preconditions.checkNotNull(callable);
this.originalCallContext = Preconditions.checkNotNull(callContext);
}

public void setExternalFuture(RetryingFuture<ResponseT> externalFuture) {
this.externalFuture = externalFuture;
this.externalFuture = Preconditions.checkNotNull(externalFuture);
}

@Override
public ResponseT call() {
ApiCallContext callContext = originalCallContext;

try {
Duration rpcTimeout = externalFuture.getAttemptSettings().getRpcTimeout();
if (!rpcTimeout.isZero()) {
callContext = callContext.withTimeout(rpcTimeout);
}

externalFuture.setAttemptFuture(new NonCancellableFuture<ResponseT>());
if (externalFuture.isDone()) {
return null;
}
ApiFuture<ResponseT> internalFuture = callable.futureCall(null, null);
// NOTE: The callable here is an OperationCheckingCallable, which will compose its own

This comment was marked as spam.

// request using a resolved operation name and ignore anything that we pass here for the
// request.
ApiFuture<ResponseT> internalFuture = callable.futureCall(null, callContext);
externalFuture.setAttemptFuture(internalFuture);
} catch (Throwable e) {
externalFuture.setAttemptFuture(ApiFutures.<ResponseT>immediateFailedFuture(e));
Expand Down
30 changes: 19 additions & 11 deletions gax/src/main/java/com/google/api/gax/rpc/OperationCallableImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -73,21 +73,27 @@ class OperationCallableImpl<RequestT, ResponseT, MetadataT>
* time.
*
* @param request The request to initiate the operation.
* @param context {@link ApiCallContext} to make the call with
* @param callContext {@link ApiCallContext} to make the call with
* @return {@link OperationFuture} for the call result
*/
@Override
public OperationFuture<ResponseT, MetadataT> futureCall(
RequestT request, ApiCallContext context) {
return futureCall(initialCallable.futureCall(request, context));
RequestT request, ApiCallContext callContext) {
ApiFuture<OperationSnapshot> initialFuture = initialCallable.futureCall(request, callContext);
return futureCall(initialFuture, callContext);
}

OperationFutureImpl<ResponseT, MetadataT> futureCall(ApiFuture<OperationSnapshot> initialFuture) {
/** Waits for the initialFuture to resolve and then starts to poll the return operation. */
OperationFutureImpl<ResponseT, MetadataT> futureCall(
ApiFuture<OperationSnapshot> initialFuture, ApiCallContext callContext) {

RecheckingCallable<RequestT, OperationSnapshot> callable =
new RecheckingCallable<>(
new OperationCheckingCallable<RequestT>(longRunningClient, initialFuture), executor);

RetryingFuture<OperationSnapshot> pollingFuture = callable.futureCall(null, null);
// NOTE: OperationCheckingCallable will compose its own request using the resolved
// initialFuture. So the request parameter to futureCall is ignored
RetryingFuture<OperationSnapshot> pollingFuture = callable.futureCall(null, callContext);
return new OperationFutureImpl<>(
pollingFuture, initialFuture, responseTransformer, metadataTransformer);
}
Expand All @@ -98,24 +104,26 @@ OperationFutureImpl<ResponseT, MetadataT> futureCall(ApiFuture<OperationSnapshot
* operation finishes.
*
* @param operationName The name of the operation to resume.
* @param context {@link ApiCallContext} to make the call with
* @param callContext {@link ApiCallContext} to make the call with
* @return {@link OperationFuture} for the call result.
*/
@Override
public OperationFuture<ResponseT, MetadataT> resumeFutureCall(
String operationName, ApiCallContext context) {
return futureCall(longRunningClient.getOperationCallable().futureCall(operationName, context));
String operationName, ApiCallContext callContext) {
ApiFuture<OperationSnapshot> firstAttempt =
longRunningClient.getOperationCallable().futureCall(operationName, callContext);
return futureCall(firstAttempt, callContext);
}

/**
* Sends a cancellation request to the server for the operation with name {@code operationName}.
*
* @param operationName The name of the operation to cancel.
* @param context {@link ApiCallContext} to make the call with
* @param callContext {@link ApiCallContext} to make the call with
* @return the future which completes once the operation is canceled on the server side.
*/
@Override
public ApiFuture<Void> cancel(String operationName, ApiCallContext context) {
return longRunningClient.cancelOperationCallable().futureCall(operationName, context);
public ApiFuture<Void> cancel(String operationName, ApiCallContext callContext) {
return longRunningClient.cancelOperationCallable().futureCall(operationName, callContext);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -59,11 +59,11 @@ class OperationCheckingCallable<RequestT> extends UnaryCallable<RequestT, Operat
/**
* This method is supposed to be called from {@link AttemptCallable#call()}
*
* @param request request
* @param context call context
* @param ignored request. The request will be composed based on the result of the initialFuture.

This comment was marked as spam.

* @param callContext call context
*/
@Override
public ApiFuture<OperationSnapshot> futureCall(RequestT request, ApiCallContext context) {
public ApiFuture<OperationSnapshot> futureCall(RequestT ignored, ApiCallContext callContext) {

This comment was marked as spam.

This comment was marked as spam.

try {
if (!initialFuture.isDone() || initialFuture.isCancelled()) {
return initialFuture;
Expand All @@ -76,7 +76,9 @@ public ApiFuture<OperationSnapshot> futureCall(RequestT request, ApiCallContext
return initialFuture;
}

return longRunningClient.getOperationCallable().futureCall(initialOperation.getName(), null);
return longRunningClient
.getOperationCallable()
.futureCall(initialOperation.getName(), callContext);
} catch (ExecutionException e) {
return ApiFutures.immediateFailedFuture(e.getCause());
} catch (InterruptedException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
* A UnaryCallable that will keep issuing calls to an inner callable until a terminal condition is
* met.
*
* <p>Note: Any request or context passed to this class is ignored.
* <p>Note: Any request passed to this class is ignored.
*
* <p>Package-private for internal use.
*/
Expand All @@ -52,9 +52,9 @@ class RecheckingCallable<RequestT, ResponseT> extends UnaryCallable<RequestT, Re
}

@Override
public RetryingFuture<ResponseT> futureCall(RequestT request, ApiCallContext inputContext) {
public RetryingFuture<ResponseT> futureCall(RequestT ignored, ApiCallContext inputContext) {
CheckingAttemptCallable<RequestT, ResponseT> checkingAttemptCallable =
new CheckingAttemptCallable<>(callable);
new CheckingAttemptCallable<>(callable, inputContext);

RetryingFuture<ResponseT> retryingFuture = executor.createFuture(checkingAttemptCallable);
checkingAttemptCallable.setExternalFuture(retryingFuture);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
/*
* Copyright 2016 Google LLC
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
* * Neither the name of Google LLC nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
package com.google.api.gax.rpc;

import static com.google.common.truth.Truth.assertThat;

import com.google.api.core.SettableApiFuture;
import com.google.api.gax.retrying.RetrySettings;
import com.google.api.gax.retrying.RetryingFuture;
import com.google.api.gax.retrying.TimedAttemptSettings;
import com.google.api.gax.rpc.testing.FakeCallContext;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.ArgumentCaptor;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.junit.MockitoJUnitRunner;
import org.mockito.stubbing.Answer;
import org.threeten.bp.Duration;

@RunWith(MockitoJUnitRunner.class)
public class CheckingAttemptCallableTest {
@Mock UnaryCallable<String, String> mockInnerCallable;
ArgumentCaptor<ApiCallContext> capturedCallContext;
@Mock RetryingFuture<String> mockExternalFuture;
TimedAttemptSettings currentAttemptSettings;

@Before
public void setUp() {
capturedCallContext = ArgumentCaptor.forClass(ApiCallContext.class);
Mockito.when(mockInnerCallable.futureCall(Mockito.<String>any(), capturedCallContext.capture()))
.thenReturn(SettableApiFuture.<String>create());

currentAttemptSettings =
TimedAttemptSettings.newBuilder()
.setGlobalSettings(RetrySettings.newBuilder().build())
.setAttemptCount(0)
.setFirstAttemptStartTimeNanos(0)
.setRetryDelay(Duration.ofSeconds(1))
.setRandomizedRetryDelay(Duration.ofSeconds(1))
.setRpcTimeout(Duration.ZERO)
.build();

Mockito.when(mockExternalFuture.getAttemptSettings())
.thenAnswer(
new Answer<TimedAttemptSettings>() {
@Override
public TimedAttemptSettings answer(InvocationOnMock invocation) throws Throwable {
return currentAttemptSettings;
}
});
}

@Test
public void testRpcTimeout() {
CheckingAttemptCallable<String, String> callable =
new CheckingAttemptCallable<>(mockInnerCallable, FakeCallContext.createDefault());
callable.setExternalFuture(mockExternalFuture);

// Make sure that the rpc timeout is set
Duration timeout = Duration.ofSeconds(10);
currentAttemptSettings = currentAttemptSettings.toBuilder().setRpcTimeout(timeout).build();

callable.call();

assertThat(capturedCallContext.getValue().getTimeout()).isEqualTo(timeout);

// Make sure that subsequent attempts can extend the time out
Duration longerTimeout = Duration.ofSeconds(20);
currentAttemptSettings =
currentAttemptSettings.toBuilder().setRpcTimeout(longerTimeout).build();
callable.call();
assertThat(capturedCallContext.getValue().getTimeout()).isEqualTo(longerTimeout);
}
}
Loading