Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: retry certain RESOURCE_EXHAUSTED errors observed during ReadRows and report retry attempts #1257

Merged
merged 1 commit into from
Aug 24, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,53 @@
*/
package com.google.cloud.bigquery.storage.util;

import com.google.rpc.RetryInfo;
import io.grpc.Metadata;
import io.grpc.Status;
import io.grpc.protobuf.ProtoUtils;
import org.threeten.bp.Duration;

/** Static utility methods for working with Errors returned from the service. */
public class Errors {
private Errors() {};

public static class IsRetryableStatusResult {
public boolean isRetryable = false;
public Duration retryDelay = null;
}

private static final Metadata.Key<RetryInfo> KEY_RETRY_INFO =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Interesting, we'll need to see if we have compatible key resolvers for other langs. I've not seen this before, but apparently its descriptor fullname and a "-bin" suffix?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's exactly what it does. I'm having a hard time finding external docs about why it is supposed to be like that, but you can find other libraries interacting with gcp services using the same keys, e.g. https://github.com/googleapis/google-cloud-go/blob/master/spanner/retry.go#L33

ProtoUtils.keyForProto(RetryInfo.getDefaultInstance());

/**
* Returns true iff the Status indicates an error that is retryable.
*
* <p>Generally, internal errors are not considered retryable, however there are certain transient
* network issues that appear as internal but are in fact retryable.
*
* <p>Resource exhausted errors are only considered retryable if metadata contains a serialized
* RetryInfo object.
*/
public static IsRetryableStatusResult isRetryableStatus(Status status, Metadata metadata) {
IsRetryableStatusResult result = new IsRetryableStatusResult();

result.isRetryable = isRetryableInternalStatus(status);
if (!result.isRetryable
&& status.getCode() == Status.Code.RESOURCE_EXHAUSTED
&& metadata != null
&& metadata.containsKey(KEY_RETRY_INFO)) {
RetryInfo retryInfo = metadata.get(KEY_RETRY_INFO);
if (retryInfo.hasRetryDelay()) {
result.isRetryable = true;
result.retryDelay =
Duration.ofSeconds(
retryInfo.getRetryDelay().getSeconds(), retryInfo.getRetryDelay().getNanos());
}
}

return result;
}

/**
* Returns true iff the Status indicates and internal error that is retryable.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,9 @@ public static final BigQueryReadClient create(EnhancedBigQueryReadStub stub) {
*/
protected BigQueryReadClient(BigQueryReadSettings settings) throws IOException {
this.settings = settings;
this.stub = EnhancedBigQueryReadStub.create(settings.getTypedStubSettings());
this.stub =
EnhancedBigQueryReadStub.create(
settings.getTypedStubSettings(), settings.getReadRowsRetryAttemptListener());
}

@BetaApi("A restructuring of stub classes is planned, so this may break in the future")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
import com.google.api.gax.rpc.TransportChannelProvider;
import com.google.api.gax.rpc.UnaryCallSettings;
import com.google.cloud.bigquery.storage.v1.stub.EnhancedBigQueryReadStubSettings;
import io.grpc.Metadata;
import io.grpc.Status;
import java.io.IOException;
import java.util.List;

Expand Down Expand Up @@ -69,6 +71,26 @@ public ServerStreamingCallSettings<ReadRowsRequest, ReadRowsResponse> readRowsSe
return getTypedStubSettings().readRowsSettings();
}

public static interface RetryAttemptListener {
public void onRetryAttempt(Status prevStatus, Metadata prevMetadata);
}

private RetryAttemptListener readRowsRetryAttemptListener = null;

/**
* If a non null readRowsRetryAttemptListener is provided, client will call onRetryAttempt
* function before a failed ReadRows request is retried. This can be used as negative feedback
* mechanism for future decision to split read streams because some retried failures are due to
* resource exhaustion that increased parallelism only makes it worse.
*/
public void setReadRowsRetryAttemptListener(RetryAttemptListener readRowsRetryAttemptListener) {
this.readRowsRetryAttemptListener = readRowsRetryAttemptListener;
}

public RetryAttemptListener getReadRowsRetryAttemptListener() {
return readRowsRetryAttemptListener;
}

/** Returns the object with the settings used for calls to splitReadStream. */
public UnaryCallSettings<SplitReadStreamRequest, SplitReadStreamResponse>
splitReadStreamSettings() {
Expand Down Expand Up @@ -176,6 +198,14 @@ public Builder applyToAllUnaryMethods(
return this;
}

private RetryAttemptListener readRowsRetryAttemptListener = null;

public Builder setReadRowsRetryAttemptListener(
RetryAttemptListener readRowsRetryAttemptListener) {
this.readRowsRetryAttemptListener = readRowsRetryAttemptListener;
return this;
}

/** Returns the builder for the settings used for calls to createReadSession. */
public UnaryCallSettings.Builder<CreateReadSessionRequest, ReadSession>
createReadSessionSettings() {
Expand All @@ -196,7 +226,9 @@ public Builder applyToAllUnaryMethods(

@Override
public BigQueryReadSettings build() throws IOException {
return new BigQueryReadSettings(this);
BigQueryReadSettings settings = new BigQueryReadSettings(this);
settings.setReadRowsRetryAttemptListener(readRowsRetryAttemptListener);
return settings;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import com.google.api.gax.tracing.SpanName;
import com.google.api.gax.tracing.TracedServerStreamingCallable;
import com.google.cloud.bigquery.storage.v1.BigQueryReadGrpc;
import com.google.cloud.bigquery.storage.v1.BigQueryReadSettings;
import com.google.cloud.bigquery.storage.v1.CreateReadSessionRequest;
import com.google.cloud.bigquery.storage.v1.ReadRowsRequest;
import com.google.cloud.bigquery.storage.v1.ReadRowsResponse;
Expand All @@ -54,10 +55,18 @@ public class EnhancedBigQueryReadStub implements BackgroundResource {
private static final String TRACING_OUTER_CLIENT_NAME = "BigQueryStorage";
private final GrpcBigQueryReadStub stub;
private final BigQueryReadStubSettings stubSettings;
private final BigQueryReadSettings.RetryAttemptListener readRowsRetryAttemptListener;
private final ClientContext context;

public static EnhancedBigQueryReadStub create(EnhancedBigQueryReadStubSettings settings)
throws IOException {
return create(settings, null);
}

public static EnhancedBigQueryReadStub create(
EnhancedBigQueryReadStubSettings settings,
BigQueryReadSettings.RetryAttemptListener readRowsRetryAttemptListener)
throws IOException {
// Configure the base settings.
BigQueryReadStubSettings.Builder baseSettingsBuilder =
BigQueryReadStubSettings.newBuilder()
Expand Down Expand Up @@ -88,14 +97,19 @@ public static EnhancedBigQueryReadStub create(EnhancedBigQueryReadStubSettings s
BigQueryReadStubSettings baseSettings = baseSettingsBuilder.build();
ClientContext clientContext = ClientContext.create(baseSettings);
GrpcBigQueryReadStub stub = new GrpcBigQueryReadStub(baseSettings, clientContext);
return new EnhancedBigQueryReadStub(stub, baseSettings, clientContext);
return new EnhancedBigQueryReadStub(
stub, baseSettings, readRowsRetryAttemptListener, clientContext);
}

@InternalApi("Visible for testing")
EnhancedBigQueryReadStub(
GrpcBigQueryReadStub stub, BigQueryReadStubSettings stubSettings, ClientContext context) {
GrpcBigQueryReadStub stub,
BigQueryReadStubSettings stubSettings,
BigQueryReadSettings.RetryAttemptListener readRowsRetryAttemptListener,
ClientContext context) {
this.stub = stub;
this.stubSettings = stubSettings;
this.readRowsRetryAttemptListener = readRowsRetryAttemptListener;
this.context = context;
}

Expand Down Expand Up @@ -123,7 +137,7 @@ public Map<String, String> extract(ReadRowsRequest request) {

StreamingRetryAlgorithm<Void> retryAlgorithm =
new StreamingRetryAlgorithm<>(
new ApiResultRetryAlgorithm<Void>(),
new ApiResultRetryAlgorithm<Void>(readRowsRetryAttemptListener),
new ExponentialRetryAlgorithm(callSettings.getRetrySettings(), context.getClock()));

ScheduledRetryingExecutor<Void> retryingExecutor =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
import com.google.api.gax.retrying.TimedAttemptSettings;
import com.google.api.gax.rpc.ApiException;
import com.google.cloud.bigquery.storage.util.Errors;
import com.google.cloud.bigquery.storage.v1.BigQueryReadSettings;
import io.grpc.Metadata;
import io.grpc.Status;
import org.threeten.bp.Duration;

Expand All @@ -30,17 +32,41 @@ public class ApiResultRetryAlgorithm<ResponseT> implements ResultRetryAlgorithm<
// Duration to sleep on if the error is DEADLINE_EXCEEDED.
public static final Duration DEADLINE_SLEEP_DURATION = Duration.ofMillis(1);

private final BigQueryReadSettings.RetryAttemptListener retryAttemptListener;

public ApiResultRetryAlgorithm() {
this(null);
}

public ApiResultRetryAlgorithm(BigQueryReadSettings.RetryAttemptListener retryAttemptListener) {
super();
this.retryAttemptListener = retryAttemptListener;
}

@Override
public TimedAttemptSettings createNextAttempt(
Throwable prevThrowable, ResponseT prevResponse, TimedAttemptSettings prevSettings) {
if (prevThrowable != null) {
Status status = Status.fromThrowable(prevThrowable);
if (Errors.isRetryableInternalStatus(status)) {
Metadata metadata = Status.trailersFromThrowable(prevThrowable);
Errors.IsRetryableStatusResult result = Errors.isRetryableStatus(status, metadata);
if (result.isRetryable) {
// If result.retryDelay isn't null, we know exactly how long we must wait, so both regular
// and randomized delays are the same.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should there still be variance for the randomized delay? result.retryDelay + jitter? Looks like the previous impl didn't jitter either so likely can be ignored if its not been a source of issues.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think it is needed in this case.

Duration retryDelay = result.retryDelay;
Duration randomizedRetryDelay = result.retryDelay;
if (retryDelay == null) {
retryDelay = prevSettings.getRetryDelay();
randomizedRetryDelay = DEADLINE_SLEEP_DURATION;
}
if (retryAttemptListener != null) {
retryAttemptListener.onRetryAttempt(status, metadata);
}
return TimedAttemptSettings.newBuilder()
.setGlobalSettings(prevSettings.getGlobalSettings())
.setRetryDelay(prevSettings.getRetryDelay())
.setRetryDelay(retryDelay)
.setRpcTimeout(prevSettings.getRpcTimeout())
.setRandomizedRetryDelay(DEADLINE_SLEEP_DURATION)
.setRandomizedRetryDelay(randomizedRetryDelay)
.setAttemptCount(prevSettings.getAttemptCount() + 1)
.setFirstAttemptStartTimeNanos(prevSettings.getFirstAttemptStartTimeNanos())
.build();
Expand All @@ -53,7 +79,8 @@ public TimedAttemptSettings createNextAttempt(
public boolean shouldRetry(Throwable prevThrowable, ResponseT prevResponse) {
if (prevThrowable != null) {
Status status = Status.fromThrowable(prevThrowable);
if (Errors.isRetryableInternalStatus(status)) {
Metadata metadata = Status.trailersFromThrowable(prevThrowable);
if (Errors.isRetryableStatus(status, metadata).isRetryable) {
return true;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,9 @@ public static final BigQueryStorageClient create(EnhancedBigQueryStorageStub stu
*/
protected BigQueryStorageClient(BigQueryStorageSettings settings) throws IOException {
this.settings = settings;
this.stub = EnhancedBigQueryStorageStub.create(settings.getTypedStubSettings());
this.stub =
EnhancedBigQueryStorageStub.create(
settings.getTypedStubSettings(), settings.getReadRowsRetryAttemptListener());
}

@BetaApi("A restructuring of stub classes is planned, so this may break in the future")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@
import com.google.cloud.bigquery.storage.v1beta1.Storage.SplitReadStreamResponse;
import com.google.cloud.bigquery.storage.v1beta1.stub.EnhancedBigQueryStorageStubSettings;
import com.google.protobuf.Empty;
import io.grpc.Metadata;
import io.grpc.Status;
import java.io.IOException;
import java.util.List;

Expand Down Expand Up @@ -78,6 +80,26 @@ public ServerStreamingCallSettings<ReadRowsRequest, ReadRowsResponse> readRowsSe
return getTypedStubSettings().readRowsSettings();
}

public static interface RetryAttemptListener {
public void onRetryAttempt(Status prevStatus, Metadata prevMetadata);
}

private RetryAttemptListener readRowsRetryAttemptListener = null;

/**
* If a non null readRowsRetryAttemptListener is provided, client will call onRetryAttempt
* function before a failed ReadRows request is retried. This can be used as negative feedback
* mechanism for future decision to split read streams because some retried failures are due to
* resource exhaustion that increased parallelism only makes it worse.
*/
public void setReadRowsRetryAttemptListener(RetryAttemptListener readRowsRetryAttemptListener) {
this.readRowsRetryAttemptListener = readRowsRetryAttemptListener;
}

public RetryAttemptListener getReadRowsRetryAttemptListener() {
return readRowsRetryAttemptListener;
}

/** Returns the object with the settings used for calls to batchCreateReadSessionStreams. */
public UnaryCallSettings<
BatchCreateReadSessionStreamsRequest, BatchCreateReadSessionStreamsResponse>
Expand Down Expand Up @@ -197,6 +219,14 @@ public Builder applyToAllUnaryMethods(
return this;
}

private RetryAttemptListener readRowsRetryAttemptListener = null;

public Builder setReadRowsRetryAttemptListener(
RetryAttemptListener readRowsRetryAttemptListener) {
this.readRowsRetryAttemptListener = readRowsRetryAttemptListener;
return this;
}

/** Returns the builder for the settings used for calls to createReadSession. */
public UnaryCallSettings.Builder<CreateReadSessionRequest, ReadSession>
createReadSessionSettings() {
Expand Down Expand Up @@ -229,7 +259,9 @@ public UnaryCallSettings.Builder<FinalizeStreamRequest, Empty> finalizeStreamSet

@Override
public BigQueryStorageSettings build() throws IOException {
return new BigQueryStorageSettings(this);
BigQueryStorageSettings settings = new BigQueryStorageSettings(this);
settings.setReadRowsRetryAttemptListener(readRowsRetryAttemptListener);
return settings;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import com.google.api.gax.tracing.SpanName;
import com.google.api.gax.tracing.TracedServerStreamingCallable;
import com.google.cloud.bigquery.storage.v1beta1.BigQueryStorageGrpc;
import com.google.cloud.bigquery.storage.v1beta1.BigQueryStorageSettings;
import com.google.cloud.bigquery.storage.v1beta1.Storage.BatchCreateReadSessionStreamsRequest;
import com.google.cloud.bigquery.storage.v1beta1.Storage.BatchCreateReadSessionStreamsResponse;
import com.google.cloud.bigquery.storage.v1beta1.Storage.CreateReadSessionRequest;
Expand Down Expand Up @@ -58,10 +59,18 @@ public class EnhancedBigQueryStorageStub implements BackgroundResource {
private static final String TRACING_OUTER_CLIENT_NAME = "BigQueryStorage";
private final GrpcBigQueryStorageStub stub;
private final BigQueryStorageStubSettings stubSettings;
private final BigQueryStorageSettings.RetryAttemptListener readRowsRetryAttemptListener;
private final ClientContext context;

public static EnhancedBigQueryStorageStub create(EnhancedBigQueryStorageStubSettings settings)
throws IOException {
return create(settings, null);
}

public static EnhancedBigQueryStorageStub create(
EnhancedBigQueryStorageStubSettings settings,
BigQueryStorageSettings.RetryAttemptListener readRowsRetryAttemptListener)
throws IOException {
// Configure the base settings.
BigQueryStorageStubSettings.Builder baseSettingsBuilder =
BigQueryStorageStubSettings.newBuilder()
Expand Down Expand Up @@ -107,16 +116,19 @@ public static EnhancedBigQueryStorageStub create(EnhancedBigQueryStorageStubSett
BigQueryStorageStubSettings baseSettings = baseSettingsBuilder.build();
ClientContext clientContext = ClientContext.create(baseSettings);
GrpcBigQueryStorageStub stub = new GrpcBigQueryStorageStub(baseSettings, clientContext);
return new EnhancedBigQueryStorageStub(stub, baseSettings, clientContext);
return new EnhancedBigQueryStorageStub(
stub, baseSettings, readRowsRetryAttemptListener, clientContext);
}

@InternalApi("Visible for testing")
EnhancedBigQueryStorageStub(
GrpcBigQueryStorageStub stub,
BigQueryStorageStubSettings stubSettings,
BigQueryStorageSettings.RetryAttemptListener readRowsRetryAttemptListener,
ClientContext context) {
this.stub = stub;
this.stubSettings = stubSettings;
this.readRowsRetryAttemptListener = readRowsRetryAttemptListener;
this.context = context;
}

Expand Down Expand Up @@ -145,7 +157,7 @@ public Map<String, String> extract(ReadRowsRequest request) {

StreamingRetryAlgorithm<Void> retryAlgorithm =
new StreamingRetryAlgorithm<>(
new ApiResultRetryAlgorithm<Void>(),
new ApiResultRetryAlgorithm<Void>(readRowsRetryAttemptListener),
new ExponentialRetryAlgorithm(callSettings.getRetrySettings(), context.getClock()));

ScheduledRetryingExecutor<Void> retryingExecutor =
Expand Down
Loading