Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: add a timeout on retry for retryable errors #1930

Merged
merged 12 commits into from
Jan 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -56,13 +56,13 @@ implementation 'com.google.cloud:google-cloud-bigquerystorage'
If you are using Gradle without BOM, add this to your dependencies:

```Groovy
implementation 'com.google.cloud:google-cloud-bigquerystorage:2.28.0'
implementation 'com.google.cloud:google-cloud-bigquerystorage:2.28.1'
```

If you are using SBT, add this to your dependencies:

```Scala
libraryDependencies += "com.google.cloud" % "google-cloud-bigquerystorage" % "2.28.0"
libraryDependencies += "com.google.cloud" % "google-cloud-bigquerystorage" % "2.28.1"
```

## Authentication
Expand Down
10 changes: 10 additions & 0 deletions google-cloud-bigquerystorage/clirr-ignored-differences.xml
Original file line number Diff line number Diff line change
Expand Up @@ -76,4 +76,14 @@
<className>com/google/cloud/bigquery/storage/v1/ConnectionWorker</className>
<method>com.google.cloud.bigquery.storage.v1.TableSchema getUpdatedSchema()</method>
</difference>
<difference>
<differenceType>7004</differenceType>
<className>com/google/cloud/bigquery/storage/v1/ConnectionWorker</className>
<method>ConnectionWorker(java.lang.String, com.google.cloud.bigquery.storage.v1.ProtoSchema, long, long, com.google.api.gax.batching.FlowController$LimitExceededBehavior, java.lang.String, com.google.cloud.bigquery.storage.v1.BigQueryWriteClient, boolean)</method>
</difference>
<difference>
<differenceType>7004</differenceType>
<className>com/google/cloud/bigquery/storage/v1/ConnectionWorkerPool</className>
<method>ConnectionWorkerPool(long, long, com.google.api.gax.batching.FlowController$LimitExceededBehavior, java.lang.String, com.google.cloud.bigquery.storage.v1.BigQueryWriteClient, boolean)</method>
</difference>
</differences>
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import io.grpc.Status.Code;
import io.grpc.StatusRuntimeException;
import java.io.IOException;
import java.time.Duration;
import java.util.Comparator;
import java.util.Deque;
import java.util.HashMap;
Expand Down Expand Up @@ -61,6 +62,7 @@ public class ConnectionWorker implements AutoCloseable {
private Lock lock;
private Condition hasMessageInWaitingQueue;
private Condition inflightReduced;
private static Duration maxRetryDuration = Duration.ofMinutes(5);

/*
* The identifier of the current stream to write to. This stream name can change during
Expand Down Expand Up @@ -114,6 +116,9 @@ public class ConnectionWorker implements AutoCloseable {
@GuardedBy("lock")
private long conectionRetryCountWithoutCallback = 0;

@GuardedBy("lock")
private long connectionRetryStartTime = 0;

/*
* If false, streamConnection needs to be reset.
*/
Expand Down Expand Up @@ -201,6 +206,7 @@ public ConnectionWorker(
ProtoSchema writerSchema,
long maxInflightRequests,
long maxInflightBytes,
Duration maxRetryDuration,
FlowController.LimitExceededBehavior limitExceededBehavior,
String traceId,
BigQueryWriteClient client,
Expand All @@ -210,6 +216,7 @@ public ConnectionWorker(
this.hasMessageInWaitingQueue = lock.newCondition();
this.inflightReduced = lock.newCondition();
this.streamName = streamName;
this.maxRetryDuration = maxRetryDuration;
if (writerSchema == null) {
throw new StatusRuntimeException(
Status.fromCode(Code.INVALID_ARGUMENT)
Expand Down Expand Up @@ -237,6 +244,7 @@ public void run() {
}

private void resetConnection() {
log.info("Reconnecting for stream:" + streamName);
this.streamConnection =
new StreamConnection(
this.client,
Expand Down Expand Up @@ -618,6 +626,9 @@ private void requestCallback(AppendRowsResponse response) {
if (conectionRetryCountWithoutCallback != 0) {
conectionRetryCountWithoutCallback = 0;
}
if (connectionRetryStartTime != 0) {
connectionRetryStartTime = 0;
}
if (!this.inflightRequestQueue.isEmpty()) {
requestWrapper = pollInflightRequestQueue();
} else if (inflightCleanuped) {
Expand Down Expand Up @@ -686,15 +697,25 @@ private void doneCallback(Throwable finalStatus) {
try {
this.streamConnectionIsConnected = false;
if (connectionFinalStatus == null) {
if (connectionRetryStartTime == 0) {
connectionRetryStartTime = System.currentTimeMillis();
}
// If the error can be retried, don't set it here, let it try to retry later on.
if (isRetriableError(finalStatus) && !userClosed) {
if (isRetriableError(finalStatus)
&& !userClosed
&& (maxRetryDuration.toMillis() == 0f
|| System.currentTimeMillis() - connectionRetryStartTime
<= maxRetryDuration.toMillis())) {
this.conectionRetryCountWithoutCallback++;
log.info(
"Retriable error "
+ finalStatus.toString()
+ " received, retry count "
+ conectionRetryCountWithoutCallback
+ " for stream "
+ ", millis left to retry "
+ (maxRetryDuration.toMillis()
- (System.currentTimeMillis() - connectionRetryStartTime))
+ ", for stream "
+ streamName);
} else {
Exceptions.StorageException storageException = Exceptions.toStorageException(finalStatus);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,11 @@ public class ConnectionWorkerPool {
*/
private final long maxInflightBytes;

/*
* Max retry duration for retryable errors.
*/
private final java.time.Duration maxRetryDuration;

/*
* Behavior when inflight queue is exceeded. Only supports Block or Throw, default is Block.
*/
Expand Down Expand Up @@ -196,12 +201,14 @@ public abstract static class Builder {
public ConnectionWorkerPool(
long maxInflightRequests,
long maxInflightBytes,
java.time.Duration maxRetryDuration,
FlowController.LimitExceededBehavior limitExceededBehavior,
String traceId,
BigQueryWriteClient client,
boolean ownsBigQueryWriteClient) {
this.maxInflightRequests = maxInflightRequests;
this.maxInflightBytes = maxInflightBytes;
this.maxRetryDuration = maxRetryDuration;
this.limitExceededBehavior = limitExceededBehavior;
this.traceId = traceId;
this.client = client;
Expand Down Expand Up @@ -356,6 +363,7 @@ private ConnectionWorker createConnectionWorker(String streamName, ProtoSchema w
writeSchema,
maxInflightRequests,
maxInflightBytes,
maxRetryDuration,
limitExceededBehavior,
traceId,
client,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import io.grpc.Status.Code;
import io.grpc.StatusRuntimeException;
import java.io.IOException;
import java.time.Duration;
import java.util.Map;
import java.util.Objects;
import java.util.UUID;
Expand Down Expand Up @@ -193,6 +194,7 @@ private StreamWriter(Builder builder) throws IOException {
builder.writerSchema,
builder.maxInflightRequest,
builder.maxInflightBytes,
builder.maxRetryDuration,
builder.limitExceededBehavior,
builder.traceId,
getBigQueryWriteClient(builder),
Expand Down Expand Up @@ -251,6 +253,7 @@ private StreamWriter(Builder builder) throws IOException {
return new ConnectionWorkerPool(
builder.maxInflightRequest,
builder.maxInflightBytes,
builder.maxRetryDuration,
builder.limitExceededBehavior,
builder.traceId,
client,
Expand Down Expand Up @@ -494,6 +497,8 @@ public static final class Builder {

private boolean enableConnectionPool = false;

private java.time.Duration maxRetryDuration = Duration.ofMinutes(5);

private Builder(String streamName) {
this.streamName = Preconditions.checkNotNull(streamName);
this.client = null;
Expand Down Expand Up @@ -602,6 +607,15 @@ public Builder setLimitExceededBehavior(
return this;
}

/*
* Max duration to retry on retryable errors. Default is 5 minutes. You can allow unlimited
* retry by setting the value to be 0.
*/
public Builder setMaxRetryDuration(java.time.Duration maxRetryDuration) {
this.maxRetryDuration = maxRetryDuration;
return this;
}

/** Builds the {@code StreamWriterV2}. */
public StreamWriter build() throws IOException {
return new StreamWriter(this);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,8 @@ private void testSendRequestsToMultiTable(
.setMaxConnectionsPerRegion(maxConnections)
.build());
ConnectionWorkerPool connectionWorkerPool =
createConnectionWorkerPool(maxRequests, /*maxBytes=*/ 100000);
createConnectionWorkerPool(
maxRequests, /*maxBytes=*/ 100000, java.time.Duration.ofSeconds(5));

// Sets the sleep time to simulate requests stuck in connection.
testBigQueryWrite.setResponseSleep(Duration.ofMillis(50L));
Expand Down Expand Up @@ -206,7 +207,8 @@ public void testMultiStreamClosed_multiplexingEnabled() throws Exception {
ConnectionWorkerPool.setOptions(
Settings.builder().setMaxConnectionsPerRegion(10).setMinConnectionsPerRegion(5).build());
ConnectionWorkerPool connectionWorkerPool =
createConnectionWorkerPool(/*maxRequests=*/ 3, /*maxBytes=*/ 1000);
createConnectionWorkerPool(
/*maxRequests=*/ 3, /*maxBytes=*/ 1000, java.time.Duration.ofSeconds(5));

// Sets the sleep time to simulate requests stuck in connection.
testBigQueryWrite.setResponseSleep(Duration.ofMillis(50L));
Expand Down Expand Up @@ -255,7 +257,8 @@ public void testMultiStreamAppend_appendWhileClosing() throws Exception {
ConnectionWorkerPool.setOptions(
Settings.builder().setMaxConnectionsPerRegion(10).setMinConnectionsPerRegion(5).build());
ConnectionWorkerPool connectionWorkerPool =
createConnectionWorkerPool(/*maxRequests=*/ 3, /*maxBytes=*/ 100000);
createConnectionWorkerPool(
/*maxRequests=*/ 3, /*maxBytes=*/ 100000, java.time.Duration.ofSeconds(5));

// Sets the sleep time to simulate requests stuck in connection.
testBigQueryWrite.setResponseSleep(Duration.ofMillis(50L));
Expand Down Expand Up @@ -368,11 +371,13 @@ private ProtoRows createProtoRows(String[] messages) {
return rowsBuilder.build();
}

ConnectionWorkerPool createConnectionWorkerPool(long maxRequests, long maxBytes) {
ConnectionWorkerPool createConnectionWorkerPool(
long maxRequests, long maxBytes, java.time.Duration maxRetryDuration) {
ConnectionWorkerPool.enableTestingLogic();
return new ConnectionWorkerPool(
maxRequests,
maxBytes,
maxRetryDuration,
FlowController.LimitExceededBehavior.Block,
TEST_TRACE_ID,
client,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -290,16 +290,23 @@ private AppendRowsResponse createAppendResponse(long offset) {

private ConnectionWorker createConnectionWorker() throws IOException {
// By default use only the first table as table reference.
return createConnectionWorker(TEST_STREAM_1, TEST_TRACE_ID, 100, 1000);
return createConnectionWorker(
TEST_STREAM_1, TEST_TRACE_ID, 100, 1000, java.time.Duration.ofSeconds(5));
}

private ConnectionWorker createConnectionWorker(
String streamName, String traceId, long maxRequests, long maxBytes) throws IOException {
String streamName,
String traceId,
long maxRequests,
long maxBytes,
java.time.Duration maxRetryDuration)
throws IOException {
return new ConnectionWorker(
streamName,
createProtoSchema("foo"),
maxRequests,
maxBytes,
maxRetryDuration,
FlowController.LimitExceededBehavior.Block,
TEST_TRACE_ID,
client,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,10 @@ public void setTimesToClose(long numberTimesToClose) {
serviceImpl.setTimesToClose(numberTimesToClose);
}

public void setCloseForeverAfter(long closeForeverAfter) {
yirutang marked this conversation as resolved.
Show resolved Hide resolved
serviceImpl.setCloseForeverAfter(closeForeverAfter);
}

public long getConnectionCount() {
return serviceImpl.getConnectionCount();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ class FakeBigQueryWriteImpl extends BigQueryWriteGrpc.BigQueryWriteImplBase {
private long closeAfter = 0;
private long recordCount = 0;
private long connectionCount = 0;
private long closeForeverAfter = 0;

// Record whether the first record has been seen on a connection.
private final Map<StreamObserver<AppendRowsResponse>, Boolean> connectionToFirstRequest =
Expand Down Expand Up @@ -177,6 +178,9 @@ public void onNext(AppendRowsRequest value) {
&& (numberTimesToClose == 0 || connectionCount <= numberTimesToClose)) {
LOG.info("Shutting down connection from test...");
responseObserver.onError(Status.ABORTED.asException());
} else if (closeForeverAfter > 0 && recordCount > closeForeverAfter) {
LOG.info("Shutting down connection from test...");
responseObserver.onError(Status.ABORTED.asException());
} else {
final Response response = responses.get(offset);
sendResponse(response, responseObserver);
Expand Down Expand Up @@ -279,4 +283,10 @@ public void setCloseEveryNAppends(long closeAfter) {
public void setTimesToClose(long numberTimesToClose) {
this.numberTimesToClose = numberTimesToClose;
}

/* The connection will forever return failure after numberTimesToClose. This option shouldn't
* be used together with setCloseEveryNAppends and setTimesToClose*/
public void setCloseForeverAfter(long closeForeverAfter) {
this.closeForeverAfter = closeForeverAfter;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import com.google.api.gax.core.NoCredentialsProvider;
import com.google.api.gax.grpc.testing.MockGrpcService;
import com.google.api.gax.grpc.testing.MockServiceHelper;
import com.google.api.gax.rpc.AbortedException;
import com.google.api.gax.rpc.ApiException;
import com.google.api.gax.rpc.StatusCode.Code;
import com.google.api.gax.rpc.UnknownException;
Expand Down Expand Up @@ -129,13 +130,15 @@ private StreamWriter getMultiplexingTestStreamWriter() throws IOException {
.setTraceId(TEST_TRACE_ID)
.setLocation("US")
.setEnableConnectionPool(true)
.setMaxRetryDuration(java.time.Duration.ofSeconds(5))
.build();
}

private StreamWriter getTestStreamWriter() throws IOException {
return StreamWriter.newBuilder(TEST_STREAM_1, client)
.setWriterSchema(createProtoSchema())
.setTraceId(TEST_TRACE_ID)
.setMaxRetryDuration(java.time.Duration.ofSeconds(5))
.build();
}

Expand Down Expand Up @@ -884,6 +887,48 @@ public void testAppendWithResetSuccess() throws Exception {
}
}

@Test
public void testAppendWithResetNeverSuccess() throws Exception {
try (StreamWriter writer = getTestStreamWriter()) {
testBigQueryWrite.setCloseForeverAfter(1);
long appendCount = 100;
for (long i = 0; i < appendCount; i++) {
testBigQueryWrite.addResponse(createAppendResponse(i));
}
List<ApiFuture<AppendRowsResponse>> futures = new ArrayList<>();
for (long i = 0; i < appendCount; i++) {
futures.add(sendTestMessage(writer, new String[] {String.valueOf(i)}, i));
}
// first request succeeded.
assertEquals(futures.get(0).get().getAppendResult().getOffset().getValue(), 0);
// after 5 seconds, the requests will bail out.
for (int i = 1; i < appendCount; i++) {
assertFutureException(AbortedException.class, futures.get(i));
}
}
}

@Test
public void testAppendWithResetNeverSuccessWithMultiplexing() throws Exception {
try (StreamWriter writer = getMultiplexingTestStreamWriter()) {
testBigQueryWrite.setCloseForeverAfter(1);
long appendCount = 100;
for (long i = 0; i < appendCount; i++) {
testBigQueryWrite.addResponse(createAppendResponse(i));
}
List<ApiFuture<AppendRowsResponse>> futures = new ArrayList<>();
for (long i = 0; i < appendCount; i++) {
futures.add(sendTestMessage(writer, new String[] {String.valueOf(i)}, i));
}
// first request succeeded.
assertEquals(futures.get(0).get().getAppendResult().getOffset().getValue(), 0);
// after 5 seconds, the requests will bail out.
for (int i = 1; i < appendCount; i++) {
assertFutureException(AbortedException.class, futures.get(i));
}
}
}

// This test is setup for the server to force a retry after all records are sent. Ensure the
// records are resent, even if no new records are appeneded.
@Test
Expand Down