From 2de0adbdb0983823e95a03a7788f69248afb5eb4 Mon Sep 17 00:00:00 2001 From: Enis Sert Date: Wed, 18 Aug 2021 16:43:24 -0700 Subject: [PATCH] fix: retry certain RESOURCE_EXHAUSTED errors Handle certain RESOURCE_EXHAUSTED errors and report the retry attempts. --- .../cloud/bigquery/storage/util/Errors.java | 39 ++++++ .../storage/v1/BigQueryReadClient.java | 3 +- .../storage/v1/BigQueryReadSettings.java | 34 ++++- .../v1/stub/EnhancedBigQueryReadStub.java | 21 ++- .../readrows/ApiResultRetryAlgorithm.java | 35 ++++- .../v1beta1/BigQueryStorageClient.java | 3 +- .../v1beta1/BigQueryStorageSettings.java | 34 ++++- .../stub/EnhancedBigQueryStorageStub.java | 19 ++- .../readrows/ApiResultRetryAlgorithm.java | 35 ++++- .../storage/v1beta2/BigQueryReadClient.java | 3 +- .../storage/v1beta2/BigQueryReadSettings.java | 34 ++++- .../stub/EnhancedBigQueryReadStub.java | 21 ++- .../readrows/ApiResultRetryAlgorithm.java | 35 ++++- .../bigquery/storage/util/ErrorsTest.java | 78 +++++++++++ .../storage/v1/BigQueryReadClientTest.java | 123 +++++++++++++++++- .../v1beta1/BigQueryStorageClientTest.java | 123 +++++++++++++++++- .../v1beta2/BigQueryReadClientTest.java | 123 +++++++++++++++++- 17 files changed, 731 insertions(+), 32 deletions(-) diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/util/Errors.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/util/Errors.java index 067f8d242d..8f782161d6 100644 --- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/util/Errors.java +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/util/Errors.java @@ -15,12 +15,51 @@ */ package com.google.cloud.bigquery.storage.util; +import com.google.rpc.RetryInfo; +import io.grpc.Metadata; import io.grpc.Status; +import io.grpc.protobuf.ProtoUtils; +import org.threeten.bp.Duration; /** Static utility methods for working with Errors returned from the service. */ public class Errors { private Errors() {}; + public static class IsRetryableStatusResult { + public boolean isRetryable = false; + public Duration retryDelay = null; + } + + private static final Metadata.Key KEY_RETRY_INFO = + ProtoUtils.keyForProto(RetryInfo.getDefaultInstance()); + + /** + * Returns true iff the Status indicates an error that is retryable. + * + *

Generally, internal errors are not considered retryable, however there are certain transient + * network issues that appear as internal but are in fact retryable. + * + *

Resource exhausted errors are only considered retryable if metadata contains a serialized + * RetryInfo object. + */ + public static IsRetryableStatusResult isRetryableStatus(Status status, Metadata metadata) { + IsRetryableStatusResult result = new IsRetryableStatusResult(); + + result.isRetryable = isRetryableInternalStatus(status); + if (!result.isRetryable && + status.getCode() == Status.Code.RESOURCE_EXHAUSTED && + metadata != null && + metadata.containsKey(KEY_RETRY_INFO)) { + RetryInfo retryInfo = metadata.get(KEY_RETRY_INFO); + if (retryInfo.hasRetryDelay()) { + result.isRetryable = true; + result.retryDelay = Duration.ofSeconds(retryInfo.getRetryDelay().getSeconds(), retryInfo.getRetryDelay().getNanos()); + } + } + + return result; + } + /** * Returns true iff the Status indicates and internal error that is retryable. * diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/BigQueryReadClient.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/BigQueryReadClient.java index e3fbf56165..23d9c3b6fe 100644 --- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/BigQueryReadClient.java +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/BigQueryReadClient.java @@ -126,7 +126,8 @@ public static final BigQueryReadClient create(EnhancedBigQueryReadStub stub) { */ protected BigQueryReadClient(BigQueryReadSettings settings) throws IOException { this.settings = settings; - this.stub = EnhancedBigQueryReadStub.create(settings.getTypedStubSettings()); + this.stub = EnhancedBigQueryReadStub.create( + settings.getTypedStubSettings(), settings.getReadRowsRetryAttemptListener()); } @BetaApi("A restructuring of stub classes is planned, so this may break in the future") diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/BigQueryReadSettings.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/BigQueryReadSettings.java index fcf02a2331..60532dce30 100644 --- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/BigQueryReadSettings.java +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/BigQueryReadSettings.java @@ -29,6 +29,8 @@ import com.google.cloud.bigquery.storage.v1.stub.EnhancedBigQueryReadStubSettings; import java.io.IOException; import java.util.List; +import io.grpc.Metadata; +import io.grpc.Status; /** * Settings class to configure an instance of {@link BigQueryReadClient}. @@ -69,6 +71,27 @@ public ServerStreamingCallSettings readRowsSe return getTypedStubSettings().readRowsSettings(); } + public static interface RetryAttemptListener { + public void onRetryAttempt(Status prevStatus, Metadata prevMetadata); + } + + private RetryAttemptListener readRowsRetryAttemptListener = null; + + /** + * If a non null readRowsRetryAttemptListener is provided, client will call onRetryAtempt function + * before a failed ReadRows request is retried. This can be used as negative feedback mechanism + * for future decision to split read streams because some retried failures are due to resource + * exhaustion that increased parallelism only makes it worse. + */ + public void setReadRowsRetryAttemptListener( + RetryAttemptListener readRowsRetryAttemptListener) { + this.readRowsRetryAttemptListener = readRowsRetryAttemptListener; + } + + public RetryAttemptListener getReadRowsRetryAttemptListener() { + return readRowsRetryAttemptListener; + } + /** Returns the object with the settings used for calls to splitReadStream. */ public UnaryCallSettings splitReadStreamSettings() { @@ -176,6 +199,13 @@ public Builder applyToAllUnaryMethods( return this; } + private RetryAttemptListener readRowsRetryAttemptListener = null; + + public Builder setReadRowsRetryAttemptListener(RetryAttemptListener readRowsRetryAttemptListener) { + this.readRowsRetryAttemptListener = readRowsRetryAttemptListener; + return this; + } + /** Returns the builder for the settings used for calls to createReadSession. */ public UnaryCallSettings.Builder createReadSessionSettings() { @@ -196,7 +226,9 @@ public Builder applyToAllUnaryMethods( @Override public BigQueryReadSettings build() throws IOException { - return new BigQueryReadSettings(this); + BigQueryReadSettings settings = new BigQueryReadSettings(this); + settings.setReadRowsRetryAttemptListener(readRowsRetryAttemptListener); + return settings; } } } diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/stub/EnhancedBigQueryReadStub.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/stub/EnhancedBigQueryReadStub.java index 3d8e3ea0ff..46070d65c7 100644 --- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/stub/EnhancedBigQueryReadStub.java +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/stub/EnhancedBigQueryReadStub.java @@ -31,6 +31,7 @@ import com.google.api.gax.tracing.SpanName; import com.google.api.gax.tracing.TracedServerStreamingCallable; import com.google.cloud.bigquery.storage.v1.BigQueryReadGrpc; +import com.google.cloud.bigquery.storage.v1.BigQueryReadSettings; import com.google.cloud.bigquery.storage.v1.CreateReadSessionRequest; import com.google.cloud.bigquery.storage.v1.ReadRowsRequest; import com.google.cloud.bigquery.storage.v1.ReadRowsResponse; @@ -54,9 +55,17 @@ public class EnhancedBigQueryReadStub implements BackgroundResource { private static final String TRACING_OUTER_CLIENT_NAME = "BigQueryStorage"; private final GrpcBigQueryReadStub stub; private final BigQueryReadStubSettings stubSettings; + private final BigQueryReadSettings.RetryAttemptListener readRowsRetryAttemptListener; private final ClientContext context; - public static EnhancedBigQueryReadStub create(EnhancedBigQueryReadStubSettings settings) + public static EnhancedBigQueryReadStub create( + EnhancedBigQueryReadStubSettings settings) throws IOException { + return create(settings, null); + } + + public static EnhancedBigQueryReadStub create( + EnhancedBigQueryReadStubSettings settings, + BigQueryReadSettings.RetryAttemptListener readRowsRetryAttemptListener) throws IOException { // Configure the base settings. BigQueryReadStubSettings.Builder baseSettingsBuilder = @@ -88,14 +97,18 @@ public static EnhancedBigQueryReadStub create(EnhancedBigQueryReadStubSettings s BigQueryReadStubSettings baseSettings = baseSettingsBuilder.build(); ClientContext clientContext = ClientContext.create(baseSettings); GrpcBigQueryReadStub stub = new GrpcBigQueryReadStub(baseSettings, clientContext); - return new EnhancedBigQueryReadStub(stub, baseSettings, clientContext); + return new EnhancedBigQueryReadStub(stub, baseSettings, readRowsRetryAttemptListener, clientContext); } @InternalApi("Visible for testing") EnhancedBigQueryReadStub( - GrpcBigQueryReadStub stub, BigQueryReadStubSettings stubSettings, ClientContext context) { + GrpcBigQueryReadStub stub, + BigQueryReadStubSettings stubSettings, + BigQueryReadSettings.RetryAttemptListener readRowsRetryAttemptListener, + ClientContext context) { this.stub = stub; this.stubSettings = stubSettings; + this.readRowsRetryAttemptListener = readRowsRetryAttemptListener; this.context = context; } @@ -123,7 +136,7 @@ public Map extract(ReadRowsRequest request) { StreamingRetryAlgorithm retryAlgorithm = new StreamingRetryAlgorithm<>( - new ApiResultRetryAlgorithm(), + new ApiResultRetryAlgorithm(readRowsRetryAttemptListener), new ExponentialRetryAlgorithm(callSettings.getRetrySettings(), context.getClock())); ScheduledRetryingExecutor retryingExecutor = diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/stub/readrows/ApiResultRetryAlgorithm.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/stub/readrows/ApiResultRetryAlgorithm.java index 6e1269ae07..10bb440e5a 100644 --- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/stub/readrows/ApiResultRetryAlgorithm.java +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/stub/readrows/ApiResultRetryAlgorithm.java @@ -21,6 +21,8 @@ import com.google.api.gax.retrying.TimedAttemptSettings; import com.google.api.gax.rpc.ApiException; import com.google.cloud.bigquery.storage.util.Errors; +import com.google.cloud.bigquery.storage.v1.BigQueryReadSettings; +import io.grpc.Metadata; import io.grpc.Status; import org.threeten.bp.Duration; @@ -30,17 +32,41 @@ public class ApiResultRetryAlgorithm implements ResultRetryAlgorithm< // Duration to sleep on if the error is DEADLINE_EXCEEDED. public static final Duration DEADLINE_SLEEP_DURATION = Duration.ofMillis(1); + private final BigQueryReadSettings.RetryAttemptListener retryAttemptListener; + + public ApiResultRetryAlgorithm() { + this(null); + } + + public ApiResultRetryAlgorithm(BigQueryReadSettings.RetryAttemptListener retryAttemptListener) { + super(); + this.retryAttemptListener = retryAttemptListener; + } + @Override public TimedAttemptSettings createNextAttempt( Throwable prevThrowable, ResponseT prevResponse, TimedAttemptSettings prevSettings) { if (prevThrowable != null) { Status status = Status.fromThrowable(prevThrowable); - if (Errors.isRetryableInternalStatus(status)) { + Metadata metadata = Status.trailersFromThrowable(prevThrowable); + Errors.IsRetryableStatusResult result = Errors.isRetryableStatus(status, metadata); + if (result.isRetryable) { + // If result.retryDelay isn't null, we know exactly how long we must wait, so both regular + // and randomized delays are the same. + Duration retryDelay = result.retryDelay; + Duration randomizedRetryDelay = result.retryDelay; + if (retryDelay == null) { + retryDelay = prevSettings.getRetryDelay(); + randomizedRetryDelay = DEADLINE_SLEEP_DURATION; + } + if (retryAttemptListener != null) { + retryAttemptListener.onRetryAttempt(status, metadata); + } return TimedAttemptSettings.newBuilder() .setGlobalSettings(prevSettings.getGlobalSettings()) - .setRetryDelay(prevSettings.getRetryDelay()) + .setRetryDelay(retryDelay) .setRpcTimeout(prevSettings.getRpcTimeout()) - .setRandomizedRetryDelay(DEADLINE_SLEEP_DURATION) + .setRandomizedRetryDelay(randomizedRetryDelay) .setAttemptCount(prevSettings.getAttemptCount() + 1) .setFirstAttemptStartTimeNanos(prevSettings.getFirstAttemptStartTimeNanos()) .build(); @@ -53,7 +79,8 @@ public TimedAttemptSettings createNextAttempt( public boolean shouldRetry(Throwable prevThrowable, ResponseT prevResponse) { if (prevThrowable != null) { Status status = Status.fromThrowable(prevThrowable); - if (Errors.isRetryableInternalStatus(status)) { + Metadata metadata = Status.trailersFromThrowable(prevThrowable); + if (Errors.isRetryableStatus(status, metadata).isRetryable) { return true; } } diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta1/BigQueryStorageClient.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta1/BigQueryStorageClient.java index ec5b048992..cf60019d74 100644 --- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta1/BigQueryStorageClient.java +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta1/BigQueryStorageClient.java @@ -141,7 +141,8 @@ public static final BigQueryStorageClient create(EnhancedBigQueryStorageStub stu */ protected BigQueryStorageClient(BigQueryStorageSettings settings) throws IOException { this.settings = settings; - this.stub = EnhancedBigQueryStorageStub.create(settings.getTypedStubSettings()); + this.stub = EnhancedBigQueryStorageStub.create( + settings.getTypedStubSettings(), settings.getReadRowsRetryAttemptListener()); } @BetaApi("A restructuring of stub classes is planned, so this may break in the future") diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta1/BigQueryStorageSettings.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta1/BigQueryStorageSettings.java index 172ee7fa5a..3b69149032 100644 --- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta1/BigQueryStorageSettings.java +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta1/BigQueryStorageSettings.java @@ -39,6 +39,8 @@ import com.google.protobuf.Empty; import java.io.IOException; import java.util.List; +import io.grpc.Metadata; +import io.grpc.Status; /** * Settings class to configure an instance of {@link BigQueryStorageClient}. @@ -78,6 +80,27 @@ public ServerStreamingCallSettings readRowsSe return getTypedStubSettings().readRowsSettings(); } + public static interface RetryAttemptListener { + public void onRetryAttempt(Status prevStatus, Metadata prevMetadata); + } + + private RetryAttemptListener readRowsRetryAttemptListener = null; + + /** + * If a non null readRowsRetryAttemptListener is provided, client will call onRetryAtempt function + * before a failed ReadRows request is retried. This can be used as negative feedback mechanism + * for future decision to split read streams because some retried failures are due to resource + * exhaustion that increased parallelism only makes it worse. + */ + public void setReadRowsRetryAttemptListener( + RetryAttemptListener readRowsRetryAttemptListener) { + this.readRowsRetryAttemptListener = readRowsRetryAttemptListener; + } + + public RetryAttemptListener getReadRowsRetryAttemptListener() { + return readRowsRetryAttemptListener; + } + /** Returns the object with the settings used for calls to batchCreateReadSessionStreams. */ public UnaryCallSettings< BatchCreateReadSessionStreamsRequest, BatchCreateReadSessionStreamsResponse> @@ -197,6 +220,13 @@ public Builder applyToAllUnaryMethods( return this; } + private RetryAttemptListener readRowsRetryAttemptListener = null; + + public Builder setReadRowsRetryAttemptListener(RetryAttemptListener readRowsRetryAttemptListener) { + this.readRowsRetryAttemptListener = readRowsRetryAttemptListener; + return this; + } + /** Returns the builder for the settings used for calls to createReadSession. */ public UnaryCallSettings.Builder createReadSessionSettings() { @@ -229,7 +259,9 @@ public UnaryCallSettings.Builder finalizeStreamSet @Override public BigQueryStorageSettings build() throws IOException { - return new BigQueryStorageSettings(this); + BigQueryStorageSettings settings = new BigQueryStorageSettings(this); + settings.setReadRowsRetryAttemptListener(readRowsRetryAttemptListener); + return settings; } } } diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta1/stub/EnhancedBigQueryStorageStub.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta1/stub/EnhancedBigQueryStorageStub.java index 5f449adf8f..a72d7a2d1c 100644 --- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta1/stub/EnhancedBigQueryStorageStub.java +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta1/stub/EnhancedBigQueryStorageStub.java @@ -31,6 +31,7 @@ import com.google.api.gax.tracing.SpanName; import com.google.api.gax.tracing.TracedServerStreamingCallable; import com.google.cloud.bigquery.storage.v1beta1.BigQueryStorageGrpc; +import com.google.cloud.bigquery.storage.v1beta1.BigQueryStorageSettings; import com.google.cloud.bigquery.storage.v1beta1.Storage.BatchCreateReadSessionStreamsRequest; import com.google.cloud.bigquery.storage.v1beta1.Storage.BatchCreateReadSessionStreamsResponse; import com.google.cloud.bigquery.storage.v1beta1.Storage.CreateReadSessionRequest; @@ -58,9 +59,18 @@ public class EnhancedBigQueryStorageStub implements BackgroundResource { private static final String TRACING_OUTER_CLIENT_NAME = "BigQueryStorage"; private final GrpcBigQueryStorageStub stub; private final BigQueryStorageStubSettings stubSettings; + private final BigQueryStorageSettings.RetryAttemptListener readRowsRetryAttemptListener; private final ClientContext context; - public static EnhancedBigQueryStorageStub create(EnhancedBigQueryStorageStubSettings settings) + + public static EnhancedBigQueryStorageStub create( + EnhancedBigQueryStorageStubSettings settings) throws IOException { + return create(settings, null); + } + + public static EnhancedBigQueryStorageStub create( + EnhancedBigQueryStorageStubSettings settings, + BigQueryStorageSettings.RetryAttemptListener readRowsRetryAttemptListener) throws IOException { // Configure the base settings. BigQueryStorageStubSettings.Builder baseSettingsBuilder = @@ -107,16 +117,19 @@ public static EnhancedBigQueryStorageStub create(EnhancedBigQueryStorageStubSett BigQueryStorageStubSettings baseSettings = baseSettingsBuilder.build(); ClientContext clientContext = ClientContext.create(baseSettings); GrpcBigQueryStorageStub stub = new GrpcBigQueryStorageStub(baseSettings, clientContext); - return new EnhancedBigQueryStorageStub(stub, baseSettings, clientContext); + return new EnhancedBigQueryStorageStub( + stub, baseSettings, readRowsRetryAttemptListener, clientContext); } @InternalApi("Visible for testing") EnhancedBigQueryStorageStub( GrpcBigQueryStorageStub stub, BigQueryStorageStubSettings stubSettings, + BigQueryStorageSettings.RetryAttemptListener readRowsRetryAttemptListener, ClientContext context) { this.stub = stub; this.stubSettings = stubSettings; + this.readRowsRetryAttemptListener = readRowsRetryAttemptListener; this.context = context; } @@ -145,7 +158,7 @@ public Map extract(ReadRowsRequest request) { StreamingRetryAlgorithm retryAlgorithm = new StreamingRetryAlgorithm<>( - new ApiResultRetryAlgorithm(), + new ApiResultRetryAlgorithm(readRowsRetryAttemptListener), new ExponentialRetryAlgorithm(callSettings.getRetrySettings(), context.getClock())); ScheduledRetryingExecutor retryingExecutor = diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta1/stub/readrows/ApiResultRetryAlgorithm.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta1/stub/readrows/ApiResultRetryAlgorithm.java index d9cf557a76..af0cfe84ab 100644 --- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta1/stub/readrows/ApiResultRetryAlgorithm.java +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta1/stub/readrows/ApiResultRetryAlgorithm.java @@ -21,6 +21,8 @@ import com.google.api.gax.retrying.TimedAttemptSettings; import com.google.api.gax.rpc.ApiException; import com.google.cloud.bigquery.storage.util.Errors; +import com.google.cloud.bigquery.storage.v1beta1.BigQueryStorageSettings; +import io.grpc.Metadata; import io.grpc.Status; import org.threeten.bp.Duration; @@ -30,17 +32,41 @@ public class ApiResultRetryAlgorithm implements ResultRetryAlgorithm< // Duration to sleep on if the error is DEADLINE_EXCEEDED. public static final Duration DEADLINE_SLEEP_DURATION = Duration.ofMillis(1); + private final BigQueryStorageSettings.RetryAttemptListener retryAttemptListener; + + public ApiResultRetryAlgorithm() { + this(null); + } + + public ApiResultRetryAlgorithm(BigQueryStorageSettings.RetryAttemptListener retryAttemptListener) { + super(); + this.retryAttemptListener = retryAttemptListener; + } + @Override public TimedAttemptSettings createNextAttempt( Throwable prevThrowable, ResponseT prevResponse, TimedAttemptSettings prevSettings) { if (prevThrowable != null) { Status status = Status.fromThrowable(prevThrowable); - if (Errors.isRetryableInternalStatus(status)) { + Metadata metadata = Status.trailersFromThrowable(prevThrowable); + Errors.IsRetryableStatusResult result = Errors.isRetryableStatus(status, metadata); + if (result.isRetryable) { + // If result.retryDelay isn't null, we know exactly how long we must wait, so both regular + // and randomized delays are the same. + Duration retryDelay = result.retryDelay; + Duration randomizedRetryDelay = result.retryDelay; + if (retryDelay == null) { + retryDelay = prevSettings.getRetryDelay(); + randomizedRetryDelay = DEADLINE_SLEEP_DURATION; + } + if (retryAttemptListener != null) { + retryAttemptListener.onRetryAttempt(status, metadata); + } return TimedAttemptSettings.newBuilder() .setGlobalSettings(prevSettings.getGlobalSettings()) - .setRetryDelay(prevSettings.getRetryDelay()) + .setRetryDelay(retryDelay) .setRpcTimeout(prevSettings.getRpcTimeout()) - .setRandomizedRetryDelay(DEADLINE_SLEEP_DURATION) + .setRandomizedRetryDelay(randomizedRetryDelay) .setAttemptCount(prevSettings.getAttemptCount() + 1) .setFirstAttemptStartTimeNanos(prevSettings.getFirstAttemptStartTimeNanos()) .build(); @@ -53,7 +79,8 @@ public TimedAttemptSettings createNextAttempt( public boolean shouldRetry(Throwable prevThrowable, ResponseT prevResponse) { if (prevThrowable != null) { Status status = Status.fromThrowable(prevThrowable); - if (Errors.isRetryableInternalStatus(status)) { + Metadata metadata = Status.trailersFromThrowable(prevThrowable); + if (Errors.isRetryableStatus(status, metadata).isRetryable) { return true; } } diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta2/BigQueryReadClient.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta2/BigQueryReadClient.java index 23d61233df..48abeb89da 100644 --- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta2/BigQueryReadClient.java +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta2/BigQueryReadClient.java @@ -126,7 +126,8 @@ public static final BigQueryReadClient create(EnhancedBigQueryReadStub stub) { */ protected BigQueryReadClient(BigQueryReadSettings settings) throws IOException { this.settings = settings; - this.stub = EnhancedBigQueryReadStub.create(settings.getTypedStubSettings()); + this.stub = EnhancedBigQueryReadStub.create( + settings.getTypedStubSettings(), settings.getReadRowsRetryAttemptListener()); } @BetaApi("A restructuring of stub classes is planned, so this may break in the future") diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta2/BigQueryReadSettings.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta2/BigQueryReadSettings.java index f18c9e19c6..de1a0a5988 100644 --- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta2/BigQueryReadSettings.java +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta2/BigQueryReadSettings.java @@ -29,6 +29,8 @@ import com.google.cloud.bigquery.storage.v1beta2.stub.EnhancedBigQueryReadStubSettings; import java.io.IOException; import java.util.List; +import io.grpc.Metadata; +import io.grpc.Status; /** * Settings class to configure an instance of {@link BigQueryReadClient}. @@ -69,6 +71,27 @@ public ServerStreamingCallSettings readRowsSe return getTypedStubSettings().readRowsSettings(); } + public static interface RetryAttemptListener { + public void onRetryAttempt(Status prevStatus, Metadata prevMetadata); + } + + private RetryAttemptListener readRowsRetryAttemptListener = null; + + /** + * If a non null readRowsRetryAttemptListener is provided, client will call onRetryAtempt function + * before a failed ReadRows request is retried. This can be used as negative feedback mechanism + * for future decision to split read streams because some retried failures are due to resource + * exhaustion that increased parallelism only makes it worse. + */ + public void setReadRowsRetryAttemptListener( + RetryAttemptListener readRowsRetryAttemptListener) { + this.readRowsRetryAttemptListener = readRowsRetryAttemptListener; + } + + public RetryAttemptListener getReadRowsRetryAttemptListener() { + return readRowsRetryAttemptListener; + } + /** Returns the object with the settings used for calls to splitReadStream. */ public UnaryCallSettings splitReadStreamSettings() { @@ -176,6 +199,13 @@ public Builder applyToAllUnaryMethods( return this; } + private RetryAttemptListener readRowsRetryAttemptListener = null; + + public Builder setReadRowsRetryAttemptListener(RetryAttemptListener readRowsRetryAttemptListener) { + this.readRowsRetryAttemptListener = readRowsRetryAttemptListener; + return this; + } + /** Returns the builder for the settings used for calls to createReadSession. */ public UnaryCallSettings.Builder createReadSessionSettings() { @@ -196,7 +226,9 @@ public Builder applyToAllUnaryMethods( @Override public BigQueryReadSettings build() throws IOException { - return new BigQueryReadSettings(this); + BigQueryReadSettings settings = new BigQueryReadSettings(this); + settings.setReadRowsRetryAttemptListener(readRowsRetryAttemptListener); + return settings; } } } diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta2/stub/EnhancedBigQueryReadStub.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta2/stub/EnhancedBigQueryReadStub.java index 351fd21c4f..2b3b4bf74f 100644 --- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta2/stub/EnhancedBigQueryReadStub.java +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta2/stub/EnhancedBigQueryReadStub.java @@ -31,6 +31,7 @@ import com.google.api.gax.tracing.SpanName; import com.google.api.gax.tracing.TracedServerStreamingCallable; import com.google.cloud.bigquery.storage.v1beta2.BigQueryReadGrpc; +import com.google.cloud.bigquery.storage.v1beta2.BigQueryReadSettings; import com.google.cloud.bigquery.storage.v1beta2.CreateReadSessionRequest; import com.google.cloud.bigquery.storage.v1beta2.ReadRowsRequest; import com.google.cloud.bigquery.storage.v1beta2.ReadRowsResponse; @@ -54,9 +55,17 @@ public class EnhancedBigQueryReadStub implements BackgroundResource { private static final String TRACING_OUTER_CLIENT_NAME = "BigQueryStorage"; private final GrpcBigQueryReadStub stub; private final BigQueryReadStubSettings stubSettings; + private final BigQueryReadSettings.RetryAttemptListener readRowsRetryAttemptListener; private final ClientContext context; - public static EnhancedBigQueryReadStub create(EnhancedBigQueryReadStubSettings settings) + public static EnhancedBigQueryReadStub create( + EnhancedBigQueryReadStubSettings settings) throws IOException { + return create(settings, null); + } + + public static EnhancedBigQueryReadStub create( + EnhancedBigQueryReadStubSettings settings, + BigQueryReadSettings.RetryAttemptListener readRowsRetryAttemptListener) throws IOException { // Configure the base settings. BigQueryReadStubSettings.Builder baseSettingsBuilder = @@ -88,14 +97,18 @@ public static EnhancedBigQueryReadStub create(EnhancedBigQueryReadStubSettings s BigQueryReadStubSettings baseSettings = baseSettingsBuilder.build(); ClientContext clientContext = ClientContext.create(baseSettings); GrpcBigQueryReadStub stub = new GrpcBigQueryReadStub(baseSettings, clientContext); - return new EnhancedBigQueryReadStub(stub, baseSettings, clientContext); + return new EnhancedBigQueryReadStub(stub, baseSettings, readRowsRetryAttemptListener, clientContext); } @InternalApi("Visible for testing") EnhancedBigQueryReadStub( - GrpcBigQueryReadStub stub, BigQueryReadStubSettings stubSettings, ClientContext context) { + GrpcBigQueryReadStub stub, + BigQueryReadStubSettings stubSettings, + BigQueryReadSettings.RetryAttemptListener readRowsRetryAttemptListener, + ClientContext context) { this.stub = stub; this.stubSettings = stubSettings; + this.readRowsRetryAttemptListener = readRowsRetryAttemptListener; this.context = context; } @@ -123,7 +136,7 @@ public Map extract(ReadRowsRequest request) { StreamingRetryAlgorithm retryAlgorithm = new StreamingRetryAlgorithm<>( - new ApiResultRetryAlgorithm(), + new ApiResultRetryAlgorithm(readRowsRetryAttemptListener), new ExponentialRetryAlgorithm(callSettings.getRetrySettings(), context.getClock())); ScheduledRetryingExecutor retryingExecutor = diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta2/stub/readrows/ApiResultRetryAlgorithm.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta2/stub/readrows/ApiResultRetryAlgorithm.java index 2c887e1424..1d9cad46bd 100644 --- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta2/stub/readrows/ApiResultRetryAlgorithm.java +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta2/stub/readrows/ApiResultRetryAlgorithm.java @@ -21,6 +21,8 @@ import com.google.api.gax.retrying.TimedAttemptSettings; import com.google.api.gax.rpc.ApiException; import com.google.cloud.bigquery.storage.util.Errors; +import com.google.cloud.bigquery.storage.v1beta2.BigQueryReadSettings; +import io.grpc.Metadata; import io.grpc.Status; import org.threeten.bp.Duration; @@ -30,17 +32,41 @@ public class ApiResultRetryAlgorithm implements ResultRetryAlgorithm< // Duration to sleep on if the error is DEADLINE_EXCEEDED. public static final Duration DEADLINE_SLEEP_DURATION = Duration.ofMillis(1); + private final BigQueryReadSettings.RetryAttemptListener retryAttemptListener; + + public ApiResultRetryAlgorithm() { + this(null); + } + + public ApiResultRetryAlgorithm(BigQueryReadSettings.RetryAttemptListener retryAttemptListener) { + super(); + this.retryAttemptListener = retryAttemptListener; + } + @Override public TimedAttemptSettings createNextAttempt( Throwable prevThrowable, ResponseT prevResponse, TimedAttemptSettings prevSettings) { if (prevThrowable != null) { Status status = Status.fromThrowable(prevThrowable); - if (Errors.isRetryableInternalStatus(status)) { + Metadata metadata = Status.trailersFromThrowable(prevThrowable); + Errors.IsRetryableStatusResult result = Errors.isRetryableStatus(status, metadata); + if (result.isRetryable) { + // If result.retryDelay isn't null, we know exactly how long we must wait, so both regular + // and randomized delays are the same. + Duration retryDelay = result.retryDelay; + Duration randomizedRetryDelay = result.retryDelay; + if (retryDelay == null) { + retryDelay = prevSettings.getRetryDelay(); + randomizedRetryDelay = DEADLINE_SLEEP_DURATION; + } + if (retryAttemptListener != null) { + retryAttemptListener.onRetryAttempt(status, metadata); + } return TimedAttemptSettings.newBuilder() .setGlobalSettings(prevSettings.getGlobalSettings()) - .setRetryDelay(prevSettings.getRetryDelay()) + .setRetryDelay(retryDelay) .setRpcTimeout(prevSettings.getRpcTimeout()) - .setRandomizedRetryDelay(DEADLINE_SLEEP_DURATION) + .setRandomizedRetryDelay(randomizedRetryDelay) .setAttemptCount(prevSettings.getAttemptCount() + 1) .setFirstAttemptStartTimeNanos(prevSettings.getFirstAttemptStartTimeNanos()) .build(); @@ -53,7 +79,8 @@ public TimedAttemptSettings createNextAttempt( public boolean shouldRetry(Throwable prevThrowable, ResponseT prevResponse) { if (prevThrowable != null) { Status status = Status.fromThrowable(prevThrowable); - if (Errors.isRetryableInternalStatus(status)) { + Metadata metadata = Status.trailersFromThrowable(prevThrowable); + if (Errors.isRetryableStatus(status, metadata).isRetryable) { return true; } } diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/util/ErrorsTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/util/ErrorsTest.java index fa885b424a..d5d7cfd835 100644 --- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/util/ErrorsTest.java +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/util/ErrorsTest.java @@ -15,9 +15,15 @@ */ package com.google.cloud.bigquery.storage.util; +import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; +import com.google.protobuf.Duration; +import com.google.protobuf.Parser; +import com.google.rpc.RetryInfo; +import io.grpc.Metadata; import io.grpc.Status; import org.junit.Test; import org.junit.runner.RunWith; @@ -51,4 +57,76 @@ public void testNonRetryableOtherError() { Status.DATA_LOSS.withDescription( "RST_STREAM closed stream. HTTP/2 error code: INTERNAL_ERROR"))); } + + @Test + public void testIsRetryableStatus() { + Errors.IsRetryableStatusResult result = + Errors.isRetryableStatus( + Status.INTERNAL.withDescription( + "HTTP/2 error code: INTERNAL_ERROR\nReceived Rst stream"), null); + assertTrue(result.isRetryable); + assertNull(result.retryDelay); + + result = + Errors.isRetryableStatus( + Status.INTERNAL.withDescription( + "RST_STREAM closed stream. HTTP/2 error code: INTERNAL_ERROR"), null); + assertTrue(result.isRetryable); + assertNull(result.retryDelay); + + Metadata metadata = new Metadata(); + metadata.put( + Metadata.Key.of( + "some-key-bin", + new Metadata.BinaryMarshaller() { + @Override + public byte[] toBytes(Integer value) { + return new byte[]{}; + } + + @Override + public Integer parseBytes(byte[] serialized) { + return new Integer(1); + } + }), + new Integer(2)); + result = + Errors.isRetryableStatus( + Status.RESOURCE_EXHAUSTED.withDescription("You have run out of X quota"), metadata); + assertFalse(result.isRetryable); + assertNull(result.retryDelay); + + RetryInfo retryInfo = + RetryInfo.newBuilder() + .setRetryDelay(Duration.newBuilder().setSeconds(123).setNanos(456).build()) + .build(); + + metadata = new Metadata(); + metadata.put( + Metadata.Key.of( + "google.rpc.retryinfo-bin", + new Metadata.BinaryMarshaller() { + @Override + public byte[] toBytes(RetryInfo value) { + return value.toByteArray(); + } + + @Override + public RetryInfo parseBytes(byte[] serialized) { + try { + Parser parser = (RetryInfo.newBuilder().build()).getParserForType(); + return parser.parseFrom(serialized); + } catch (Exception e) { + return null; + } + } + }), + retryInfo); + + result = + Errors.isRetryableStatus( + Status.RESOURCE_EXHAUSTED.withDescription("Stop for a while"), metadata); + assertTrue(result.isRetryable); + assertEquals(result.retryDelay, org.threeten.bp.Duration.ofSeconds(123, 456)); + } } diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/BigQueryReadClientTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/BigQueryReadClientTest.java index df83a5a01a..2dd8985d81 100644 --- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/BigQueryReadClientTest.java +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/BigQueryReadClientTest.java @@ -26,9 +26,14 @@ import com.google.api.gax.rpc.ApiException; import com.google.api.gax.rpc.InternalException; import com.google.api.gax.rpc.InvalidArgumentException; +import com.google.api.gax.rpc.ResourceExhaustedException; import com.google.api.gax.rpc.ServerStreamingCallable; import com.google.api.gax.rpc.StatusCode; import com.google.protobuf.AbstractMessage; +import com.google.protobuf.Duration; +import com.google.protobuf.Parser; +import com.google.rpc.RetryInfo; +import io.grpc.Metadata; import io.grpc.Status; import io.grpc.Status.Code; import io.grpc.StatusRuntimeException; @@ -49,6 +54,8 @@ public class BigQueryReadClientTest { private static MockServiceHelper serviceHelper; private BigQueryReadClient client; private LocalChannelProvider channelProvider; + private int retryCount; + private Code lastRetryStatusCode; @BeforeClass public static void startStaticServer() { @@ -68,11 +75,22 @@ public static void stopServer() { public void setUp() throws IOException { serviceHelper.reset(); channelProvider = serviceHelper.createChannelProvider(); + retryCount = 0; + lastRetryStatusCode = Code.OK; BigQueryReadSettings settings = BigQueryReadSettings.newBuilder() .setTransportChannelProvider(channelProvider) .setCredentialsProvider(NoCredentialsProvider.create()) - .build(); + .setReadRowsRetryAttemptListener( + new BigQueryReadSettings.RetryAttemptListener() { + @Override + public void onRetryAttempt(Status prevStatus, Metadata prevMetadata) { + synchronized (this) { + retryCount += 1; + lastRetryStatusCode = prevStatus.getCode(); + } + } + }).build(); client = BigQueryReadClient.create(settings); } @@ -143,6 +161,9 @@ public void readRowsTest() throws Exception { List actualResponses = responseObserver.future().get(); Assert.assertEquals(1, actualResponses.size()); Assert.assertEquals(expectedResponse, actualResponses.get(0)); + + Assert.assertEquals(retryCount, 0); + Assert.assertEquals(lastRetryStatusCode, Code.OK); } @Test @@ -165,6 +186,9 @@ public void readRowsExceptionTest() throws Exception { InvalidArgumentException apiException = (InvalidArgumentException) e.getCause(); Assert.assertEquals(StatusCode.Code.INVALID_ARGUMENT, apiException.getStatusCode().getCode()); } + + Assert.assertEquals(retryCount, 0); + Assert.assertEquals(lastRetryStatusCode, Code.OK); } @Test @@ -189,6 +213,9 @@ public void readRowsRetryingEOSExceptionTest() throws ExecutionException, Interr callable.serverStreamingCall(request, responseObserver); List actualResponses = responseObserver.future().get(); Assert.assertEquals(1, actualResponses.size()); + + Assert.assertEquals(retryCount, 1); + Assert.assertEquals(lastRetryStatusCode, Code.INTERNAL); } @Test @@ -213,5 +240,99 @@ public void readRowsRetryingHttp2StreamRstTest() throws ExecutionException, Inte callable.serverStreamingCall(request, responseObserver); List actualResponses = responseObserver.future().get(); Assert.assertEquals(1, actualResponses.size()); + + Assert.assertEquals(retryCount, 1); + Assert.assertEquals(lastRetryStatusCode, Code.INTERNAL); + } + + @Test + @SuppressWarnings("all") + public void readRowsNoRetryForResourceExhaustedWithoutRetryInfo() + throws ExecutionException, InterruptedException { + ApiException exception = + new ResourceExhaustedException( + new StatusRuntimeException( + Status.RESOURCE_EXHAUSTED.withDescription( + "You are out of quota X")), + GrpcStatusCode.of(Code.RESOURCE_EXHAUSTED), + /* retryable = */ false); + mockBigQueryRead.addException(exception); + long rowCount = 1340416618L; + ReadRowsResponse expectedResponse = ReadRowsResponse.newBuilder().setRowCount(rowCount).build(); + mockBigQueryRead.addResponse(expectedResponse); + ReadRowsRequest request = ReadRowsRequest.newBuilder().build(); + + MockStreamObserver responseObserver = new MockStreamObserver<>(); + + ServerStreamingCallable callable = client.readRowsCallable(); + callable.serverStreamingCall(request, responseObserver); + + try { + List actualResponses = responseObserver.future().get(); + Assert.fail("No exception thrown"); + } catch (ExecutionException e) { + Assert.assertTrue(e.getCause() instanceof ResourceExhaustedException); + ResourceExhaustedException apiException = (ResourceExhaustedException) e.getCause(); + Assert.assertEquals(StatusCode.Code.RESOURCE_EXHAUSTED, apiException.getStatusCode().getCode()); + } + + Assert.assertEquals(retryCount, 0); + Assert.assertEquals(lastRetryStatusCode, Code.OK); + } + + @Test + @SuppressWarnings("all") + public void readRowsNoRetryForResourceExhaustedWithRetryInfo() + throws ExecutionException, InterruptedException { + RetryInfo retryInfo = + RetryInfo.newBuilder() + .setRetryDelay(Duration.newBuilder().setSeconds(123).setNanos(456).build()) + .build(); + + Metadata metadata = new Metadata(); + metadata.put( + Metadata.Key.of( + "google.rpc.retryinfo-bin", + new Metadata.BinaryMarshaller() { + @Override + public byte[] toBytes(RetryInfo value) { + return value.toByteArray(); + } + + @Override + public RetryInfo parseBytes(byte[] serialized) { + try { + Parser parser = (RetryInfo.newBuilder().build()).getParserForType(); + return parser.parseFrom(serialized); + } catch (Exception e) { + return null; + } + } + }), + retryInfo); + + ApiException exception = + new ResourceExhaustedException( + new StatusRuntimeException( + Status.RESOURCE_EXHAUSTED.withDescription( + "Try again in a bit"), + metadata), + GrpcStatusCode.of(Code.RESOURCE_EXHAUSTED), + /* retryable = */ false); + mockBigQueryRead.addException(exception); + long rowCount = 1340416618L; + ReadRowsResponse expectedResponse = ReadRowsResponse.newBuilder().setRowCount(rowCount).build(); + mockBigQueryRead.addResponse(expectedResponse); + ReadRowsRequest request = ReadRowsRequest.newBuilder().build(); + + MockStreamObserver responseObserver = new MockStreamObserver<>(); + + ServerStreamingCallable callable = client.readRowsCallable(); + callable.serverStreamingCall(request, responseObserver); + List actualResponses = responseObserver.future().get(); + Assert.assertEquals(1, actualResponses.size()); + + Assert.assertEquals(retryCount, 1); + Assert.assertEquals(lastRetryStatusCode, Code.RESOURCE_EXHAUSTED); } } diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta1/BigQueryStorageClientTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta1/BigQueryStorageClientTest.java index 9dc725c9a1..b8ad38d97e 100644 --- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta1/BigQueryStorageClientTest.java +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta1/BigQueryStorageClientTest.java @@ -26,6 +26,7 @@ import com.google.api.gax.rpc.ApiException; import com.google.api.gax.rpc.InternalException; import com.google.api.gax.rpc.InvalidArgumentException; +import com.google.api.gax.rpc.ResourceExhaustedException; import com.google.api.gax.rpc.ServerStreamingCallable; import com.google.api.gax.rpc.StatusCode; import com.google.cloud.bigquery.storage.v1beta1.Storage.BatchCreateReadSessionStreamsRequest; @@ -41,7 +42,11 @@ import com.google.cloud.bigquery.storage.v1beta1.Storage.StreamPosition; import com.google.cloud.bigquery.storage.v1beta1.TableReferenceProto.TableReference; import com.google.protobuf.AbstractMessage; +import com.google.protobuf.Duration; import com.google.protobuf.Empty; +import com.google.protobuf.Parser; +import com.google.rpc.RetryInfo; +import io.grpc.Metadata; import io.grpc.Status; import io.grpc.Status.Code; import io.grpc.StatusRuntimeException; @@ -61,6 +66,8 @@ public class BigQueryStorageClientTest { private static MockServiceHelper serviceHelper; private BigQueryStorageClient client; private LocalChannelProvider channelProvider; + private int retryCount; + private Code lastRetryStatusCode; @BeforeClass public static void startStaticServer() { @@ -79,11 +86,22 @@ public static void stopServer() { public void setUp() throws IOException { serviceHelper.reset(); channelProvider = serviceHelper.createChannelProvider(); + retryCount = 0; + lastRetryStatusCode = Code.OK; BigQueryStorageSettings settings = BigQueryStorageSettings.newBuilder() .setTransportChannelProvider(channelProvider) .setCredentialsProvider(NoCredentialsProvider.create()) - .build(); + .setReadRowsRetryAttemptListener( + new BigQueryStorageSettings.RetryAttemptListener() { + @Override + public void onRetryAttempt(Status prevStatus, Metadata prevMetadata) { + synchronized (this) { + retryCount += 1; + lastRetryStatusCode = prevStatus.getCode(); + } + } + }).build(); client = BigQueryStorageClient.create(settings); } @@ -153,6 +171,9 @@ public void readRowsTest() throws Exception { List actualResponses = responseObserver.future().get(); Assert.assertEquals(1, actualResponses.size()); Assert.assertEquals(expectedResponse, actualResponses.get(0)); + + Assert.assertEquals(retryCount, 0); + Assert.assertEquals(lastRetryStatusCode, Code.OK); } @Test @@ -176,6 +197,9 @@ public void readRowsExceptionTest() throws Exception { InvalidArgumentException apiException = (InvalidArgumentException) e.getCause(); Assert.assertEquals(StatusCode.Code.INVALID_ARGUMENT, apiException.getStatusCode().getCode()); } + + Assert.assertEquals(retryCount, 0); + Assert.assertEquals(lastRetryStatusCode, Code.OK); } @Test @@ -319,6 +343,9 @@ public void readRowsRetryingEOSExceptionTest() throws ExecutionException, Interr callable.serverStreamingCall(request, responseObserver); List actualResponses = responseObserver.future().get(); Assert.assertEquals(1, actualResponses.size()); + + Assert.assertEquals(retryCount, 1); + Assert.assertEquals(lastRetryStatusCode, Code.INTERNAL); } @Test @@ -343,5 +370,99 @@ public void readRowsRetryingHttp2StreamRstTest() throws ExecutionException, Inte callable.serverStreamingCall(request, responseObserver); List actualResponses = responseObserver.future().get(); Assert.assertEquals(1, actualResponses.size()); + + Assert.assertEquals(retryCount, 1); + Assert.assertEquals(lastRetryStatusCode, Code.INTERNAL); + } + + @Test + @SuppressWarnings("all") + public void readRowsNoRetryForResourceExhaustedWithoutRetryInfo() + throws ExecutionException, InterruptedException { + ApiException exception = + new ResourceExhaustedException( + new StatusRuntimeException( + Status.RESOURCE_EXHAUSTED.withDescription( + "You are out of quota X")), + GrpcStatusCode.of(Code.RESOURCE_EXHAUSTED), + /* retryable = */ false); + mockBigQueryStorage.addException(exception); + long rowCount = 1340416618L; + ReadRowsResponse expectedResponse = ReadRowsResponse.newBuilder().setRowCount(rowCount).build(); + mockBigQueryStorage.addResponse(expectedResponse); + ReadRowsRequest request = ReadRowsRequest.newBuilder().build(); + + MockStreamObserver responseObserver = new MockStreamObserver<>(); + + ServerStreamingCallable callable = client.readRowsCallable(); + callable.serverStreamingCall(request, responseObserver); + + try { + List actualResponses = responseObserver.future().get(); + Assert.fail("No exception thrown"); + } catch (ExecutionException e) { + Assert.assertTrue(e.getCause() instanceof ResourceExhaustedException); + ResourceExhaustedException apiException = (ResourceExhaustedException) e.getCause(); + Assert.assertEquals(StatusCode.Code.RESOURCE_EXHAUSTED, apiException.getStatusCode().getCode()); + } + + Assert.assertEquals(retryCount, 0); + Assert.assertEquals(lastRetryStatusCode, Code.OK); + } + + @Test + @SuppressWarnings("all") + public void readRowsNoRetryForResourceExhaustedWithRetryInfo() + throws ExecutionException, InterruptedException { + RetryInfo retryInfo = + RetryInfo.newBuilder() + .setRetryDelay(Duration.newBuilder().setSeconds(123).setNanos(456).build()) + .build(); + + Metadata metadata = new Metadata(); + metadata.put( + Metadata.Key.of( + "google.rpc.retryinfo-bin", + new Metadata.BinaryMarshaller() { + @Override + public byte[] toBytes(RetryInfo value) { + return value.toByteArray(); + } + + @Override + public RetryInfo parseBytes(byte[] serialized) { + try { + Parser parser = (RetryInfo.newBuilder().build()).getParserForType(); + return parser.parseFrom(serialized); + } catch (Exception e) { + return null; + } + } + }), + retryInfo); + + ApiException exception = + new ResourceExhaustedException( + new StatusRuntimeException( + Status.RESOURCE_EXHAUSTED.withDescription( + "Try again in a bit"), + metadata), + GrpcStatusCode.of(Code.RESOURCE_EXHAUSTED), + /* retryable = */ false); + mockBigQueryStorage.addException(exception); + long rowCount = 1340416618L; + ReadRowsResponse expectedResponse = ReadRowsResponse.newBuilder().setRowCount(rowCount).build(); + mockBigQueryStorage.addResponse(expectedResponse); + ReadRowsRequest request = ReadRowsRequest.newBuilder().build(); + + MockStreamObserver responseObserver = new MockStreamObserver<>(); + + ServerStreamingCallable callable = client.readRowsCallable(); + callable.serverStreamingCall(request, responseObserver); + List actualResponses = responseObserver.future().get(); + Assert.assertEquals(1, actualResponses.size()); + + Assert.assertEquals(retryCount, 1); + Assert.assertEquals(lastRetryStatusCode, Code.RESOURCE_EXHAUSTED); } } diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/BigQueryReadClientTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/BigQueryReadClientTest.java index 90bea22573..710451679a 100644 --- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/BigQueryReadClientTest.java +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/BigQueryReadClientTest.java @@ -26,9 +26,14 @@ import com.google.api.gax.rpc.ApiException; import com.google.api.gax.rpc.InternalException; import com.google.api.gax.rpc.InvalidArgumentException; +import com.google.api.gax.rpc.ResourceExhaustedException; import com.google.api.gax.rpc.ServerStreamingCallable; import com.google.api.gax.rpc.StatusCode; import com.google.protobuf.AbstractMessage; +import com.google.protobuf.Duration; +import com.google.protobuf.Parser; +import com.google.rpc.RetryInfo; +import io.grpc.Metadata; import io.grpc.Status; import io.grpc.Status.Code; import io.grpc.StatusRuntimeException; @@ -49,6 +54,8 @@ public class BigQueryReadClientTest { private static MockServiceHelper serviceHelper; private BigQueryReadClient client; private LocalChannelProvider channelProvider; + private int retryCount; + private Code lastRetryStatusCode; @BeforeClass public static void startStaticServer() { @@ -68,11 +75,22 @@ public static void stopServer() { public void setUp() throws IOException { serviceHelper.reset(); channelProvider = serviceHelper.createChannelProvider(); + retryCount = 0; + lastRetryStatusCode = Code.OK; BigQueryReadSettings settings = BigQueryReadSettings.newBuilder() .setTransportChannelProvider(channelProvider) .setCredentialsProvider(NoCredentialsProvider.create()) - .build(); + .setReadRowsRetryAttemptListener( + new BigQueryReadSettings.RetryAttemptListener() { + @Override + public void onRetryAttempt(Status prevStatus, Metadata prevMetadata) { + synchronized (this) { + retryCount += 1; + lastRetryStatusCode = prevStatus.getCode(); + } + } + }).build(); client = BigQueryReadClient.create(settings); } @@ -143,6 +161,9 @@ public void readRowsTest() throws Exception { List actualResponses = responseObserver.future().get(); Assert.assertEquals(1, actualResponses.size()); Assert.assertEquals(expectedResponse, actualResponses.get(0)); + + Assert.assertEquals(retryCount, 0); + Assert.assertEquals(lastRetryStatusCode, Code.OK); } @Test @@ -165,6 +186,9 @@ public void readRowsExceptionTest() throws Exception { InvalidArgumentException apiException = (InvalidArgumentException) e.getCause(); Assert.assertEquals(StatusCode.Code.INVALID_ARGUMENT, apiException.getStatusCode().getCode()); } + + Assert.assertEquals(retryCount, 0); + Assert.assertEquals(lastRetryStatusCode, Code.OK); } @Test @@ -189,6 +213,9 @@ public void readRowsRetryingEOSExceptionTest() throws ExecutionException, Interr callable.serverStreamingCall(request, responseObserver); List actualResponses = responseObserver.future().get(); Assert.assertEquals(1, actualResponses.size()); + + Assert.assertEquals(retryCount, 1); + Assert.assertEquals(lastRetryStatusCode, Code.INTERNAL); } @Test @@ -213,5 +240,99 @@ public void readRowsRetryingHttp2StreamRstTest() throws ExecutionException, Inte callable.serverStreamingCall(request, responseObserver); List actualResponses = responseObserver.future().get(); Assert.assertEquals(1, actualResponses.size()); + + Assert.assertEquals(retryCount, 1); + Assert.assertEquals(lastRetryStatusCode, Code.INTERNAL); + } + + @Test + @SuppressWarnings("all") + public void readRowsNoRetryForResourceExhaustedWithoutRetryInfo() + throws ExecutionException, InterruptedException { + ApiException exception = + new ResourceExhaustedException( + new StatusRuntimeException( + Status.RESOURCE_EXHAUSTED.withDescription( + "You are out of quota X")), + GrpcStatusCode.of(Code.RESOURCE_EXHAUSTED), + /* retryable = */ false); + mockBigQueryRead.addException(exception); + long rowCount = 1340416618L; + ReadRowsResponse expectedResponse = ReadRowsResponse.newBuilder().setRowCount(rowCount).build(); + mockBigQueryRead.addResponse(expectedResponse); + ReadRowsRequest request = ReadRowsRequest.newBuilder().build(); + + MockStreamObserver responseObserver = new MockStreamObserver<>(); + + ServerStreamingCallable callable = client.readRowsCallable(); + callable.serverStreamingCall(request, responseObserver); + + try { + List actualResponses = responseObserver.future().get(); + Assert.fail("No exception thrown"); + } catch (ExecutionException e) { + Assert.assertTrue(e.getCause() instanceof ResourceExhaustedException); + ResourceExhaustedException apiException = (ResourceExhaustedException) e.getCause(); + Assert.assertEquals(StatusCode.Code.RESOURCE_EXHAUSTED, apiException.getStatusCode().getCode()); + } + + Assert.assertEquals(retryCount, 0); + Assert.assertEquals(lastRetryStatusCode, Code.OK); + } + + @Test + @SuppressWarnings("all") + public void readRowsNoRetryForResourceExhaustedWithRetryInfo() + throws ExecutionException, InterruptedException { + RetryInfo retryInfo = + RetryInfo.newBuilder() + .setRetryDelay(Duration.newBuilder().setSeconds(123).setNanos(456).build()) + .build(); + + Metadata metadata = new Metadata(); + metadata.put( + Metadata.Key.of( + "google.rpc.retryinfo-bin", + new Metadata.BinaryMarshaller() { + @Override + public byte[] toBytes(RetryInfo value) { + return value.toByteArray(); + } + + @Override + public RetryInfo parseBytes(byte[] serialized) { + try { + Parser parser = (RetryInfo.newBuilder().build()).getParserForType(); + return parser.parseFrom(serialized); + } catch (Exception e) { + return null; + } + } + }), + retryInfo); + + ApiException exception = + new ResourceExhaustedException( + new StatusRuntimeException( + Status.RESOURCE_EXHAUSTED.withDescription( + "Try again in a bit"), + metadata), + GrpcStatusCode.of(Code.RESOURCE_EXHAUSTED), + /* retryable = */ false); + mockBigQueryRead.addException(exception); + long rowCount = 1340416618L; + ReadRowsResponse expectedResponse = ReadRowsResponse.newBuilder().setRowCount(rowCount).build(); + mockBigQueryRead.addResponse(expectedResponse); + ReadRowsRequest request = ReadRowsRequest.newBuilder().build(); + + MockStreamObserver responseObserver = new MockStreamObserver<>(); + + ServerStreamingCallable callable = client.readRowsCallable(); + callable.serverStreamingCall(request, responseObserver); + List actualResponses = responseObserver.future().get(); + Assert.assertEquals(1, actualResponses.size()); + + Assert.assertEquals(retryCount, 1); + Assert.assertEquals(lastRetryStatusCode, Code.RESOURCE_EXHAUSTED); } }