diff --git a/README.md b/README.md
index 364cf27829..765e396ddb 100644
--- a/README.md
+++ b/README.md
@@ -56,13 +56,13 @@ implementation 'com.google.cloud:google-cloud-bigquerystorage'
If you are using Gradle without BOM, add this to your dependencies:
```Groovy
-implementation 'com.google.cloud:google-cloud-bigquerystorage:2.28.0'
+implementation 'com.google.cloud:google-cloud-bigquerystorage:2.28.1'
```
If you are using SBT, add this to your dependencies:
```Scala
-libraryDependencies += "com.google.cloud" % "google-cloud-bigquerystorage" % "2.28.0"
+libraryDependencies += "com.google.cloud" % "google-cloud-bigquerystorage" % "2.28.1"
```
## Authentication
diff --git a/google-cloud-bigquerystorage/clirr-ignored-differences.xml b/google-cloud-bigquerystorage/clirr-ignored-differences.xml
index 080a8c33f3..b33dced533 100644
--- a/google-cloud-bigquerystorage/clirr-ignored-differences.xml
+++ b/google-cloud-bigquerystorage/clirr-ignored-differences.xml
@@ -76,4 +76,14 @@
com/google/cloud/bigquery/storage/v1/ConnectionWorker
com.google.cloud.bigquery.storage.v1.TableSchema getUpdatedSchema()
+
+ 7004
+ com/google/cloud/bigquery/storage/v1/ConnectionWorker
+ ConnectionWorker(java.lang.String, com.google.cloud.bigquery.storage.v1.ProtoSchema, long, long, com.google.api.gax.batching.FlowController$LimitExceededBehavior, java.lang.String, com.google.cloud.bigquery.storage.v1.BigQueryWriteClient, boolean)
+
+
+ 7004
+ com/google/cloud/bigquery/storage/v1/ConnectionWorkerPool
+ ConnectionWorkerPool(long, long, com.google.api.gax.batching.FlowController$LimitExceededBehavior, java.lang.String, com.google.cloud.bigquery.storage.v1.BigQueryWriteClient, boolean)
+
diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorker.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorker.java
index 3520ad0a98..50086e95e2 100644
--- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorker.java
+++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorker.java
@@ -31,6 +31,7 @@
import io.grpc.Status.Code;
import io.grpc.StatusRuntimeException;
import java.io.IOException;
+import java.time.Duration;
import java.util.Comparator;
import java.util.Deque;
import java.util.HashMap;
@@ -61,6 +62,7 @@ public class ConnectionWorker implements AutoCloseable {
private Lock lock;
private Condition hasMessageInWaitingQueue;
private Condition inflightReduced;
+ private static Duration maxRetryDuration = Duration.ofMinutes(5);
/*
* The identifier of the current stream to write to. This stream name can change during
@@ -114,6 +116,9 @@ public class ConnectionWorker implements AutoCloseable {
@GuardedBy("lock")
private long conectionRetryCountWithoutCallback = 0;
+ @GuardedBy("lock")
+ private long connectionRetryStartTime = 0;
+
/*
* If false, streamConnection needs to be reset.
*/
@@ -201,6 +206,7 @@ public ConnectionWorker(
ProtoSchema writerSchema,
long maxInflightRequests,
long maxInflightBytes,
+ Duration maxRetryDuration,
FlowController.LimitExceededBehavior limitExceededBehavior,
String traceId,
BigQueryWriteClient client,
@@ -210,6 +216,7 @@ public ConnectionWorker(
this.hasMessageInWaitingQueue = lock.newCondition();
this.inflightReduced = lock.newCondition();
this.streamName = streamName;
+ this.maxRetryDuration = maxRetryDuration;
if (writerSchema == null) {
throw new StatusRuntimeException(
Status.fromCode(Code.INVALID_ARGUMENT)
@@ -237,6 +244,7 @@ public void run() {
}
private void resetConnection() {
+ log.info("Reconnecting for stream:" + streamName);
this.streamConnection =
new StreamConnection(
this.client,
@@ -618,6 +626,9 @@ private void requestCallback(AppendRowsResponse response) {
if (conectionRetryCountWithoutCallback != 0) {
conectionRetryCountWithoutCallback = 0;
}
+ if (connectionRetryStartTime != 0) {
+ connectionRetryStartTime = 0;
+ }
if (!this.inflightRequestQueue.isEmpty()) {
requestWrapper = pollInflightRequestQueue();
} else if (inflightCleanuped) {
@@ -686,15 +697,25 @@ private void doneCallback(Throwable finalStatus) {
try {
this.streamConnectionIsConnected = false;
if (connectionFinalStatus == null) {
+ if (connectionRetryStartTime == 0) {
+ connectionRetryStartTime = System.currentTimeMillis();
+ }
// If the error can be retried, don't set it here, let it try to retry later on.
- if (isRetriableError(finalStatus) && !userClosed) {
+ if (isRetriableError(finalStatus)
+ && !userClosed
+ && (maxRetryDuration.toMillis() == 0f
+ || System.currentTimeMillis() - connectionRetryStartTime
+ <= maxRetryDuration.toMillis())) {
this.conectionRetryCountWithoutCallback++;
log.info(
"Retriable error "
+ finalStatus.toString()
+ " received, retry count "
+ conectionRetryCountWithoutCallback
- + " for stream "
+ + ", millis left to retry "
+ + (maxRetryDuration.toMillis()
+ - (System.currentTimeMillis() - connectionRetryStartTime))
+ + ", for stream "
+ streamName);
} else {
Exceptions.StorageException storageException = Exceptions.toStorageException(finalStatus);
diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerPool.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerPool.java
index e119f4c560..121b1d0e28 100644
--- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerPool.java
+++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerPool.java
@@ -57,6 +57,11 @@ public class ConnectionWorkerPool {
*/
private final long maxInflightBytes;
+ /*
+ * Max retry duration for retryable errors.
+ */
+ private final java.time.Duration maxRetryDuration;
+
/*
* Behavior when inflight queue is exceeded. Only supports Block or Throw, default is Block.
*/
@@ -196,12 +201,14 @@ public abstract static class Builder {
public ConnectionWorkerPool(
long maxInflightRequests,
long maxInflightBytes,
+ java.time.Duration maxRetryDuration,
FlowController.LimitExceededBehavior limitExceededBehavior,
String traceId,
BigQueryWriteClient client,
boolean ownsBigQueryWriteClient) {
this.maxInflightRequests = maxInflightRequests;
this.maxInflightBytes = maxInflightBytes;
+ this.maxRetryDuration = maxRetryDuration;
this.limitExceededBehavior = limitExceededBehavior;
this.traceId = traceId;
this.client = client;
@@ -356,6 +363,7 @@ private ConnectionWorker createConnectionWorker(String streamName, ProtoSchema w
writeSchema,
maxInflightRequests,
maxInflightBytes,
+ maxRetryDuration,
limitExceededBehavior,
traceId,
client,
diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/StreamWriter.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/StreamWriter.java
index 4d07dfdd91..ff7dad474d 100644
--- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/StreamWriter.java
+++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/StreamWriter.java
@@ -29,6 +29,7 @@
import io.grpc.Status.Code;
import io.grpc.StatusRuntimeException;
import java.io.IOException;
+import java.time.Duration;
import java.util.Map;
import java.util.Objects;
import java.util.UUID;
@@ -193,6 +194,7 @@ private StreamWriter(Builder builder) throws IOException {
builder.writerSchema,
builder.maxInflightRequest,
builder.maxInflightBytes,
+ builder.maxRetryDuration,
builder.limitExceededBehavior,
builder.traceId,
getBigQueryWriteClient(builder),
@@ -251,6 +253,7 @@ private StreamWriter(Builder builder) throws IOException {
return new ConnectionWorkerPool(
builder.maxInflightRequest,
builder.maxInflightBytes,
+ builder.maxRetryDuration,
builder.limitExceededBehavior,
builder.traceId,
client,
@@ -494,6 +497,8 @@ public static final class Builder {
private boolean enableConnectionPool = false;
+ private java.time.Duration maxRetryDuration = Duration.ofMinutes(5);
+
private Builder(String streamName) {
this.streamName = Preconditions.checkNotNull(streamName);
this.client = null;
@@ -602,6 +607,15 @@ public Builder setLimitExceededBehavior(
return this;
}
+ /*
+ * Max duration to retry on retryable errors. Default is 5 minutes. You can allow unlimited
+ * retry by setting the value to be 0.
+ */
+ public Builder setMaxRetryDuration(java.time.Duration maxRetryDuration) {
+ this.maxRetryDuration = maxRetryDuration;
+ return this;
+ }
+
/** Builds the {@code StreamWriterV2}. */
public StreamWriter build() throws IOException {
return new StreamWriter(this);
diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerPoolTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerPoolTest.java
index 08543f539d..961ad3fdc1 100644
--- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerPoolTest.java
+++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerPoolTest.java
@@ -153,7 +153,8 @@ private void testSendRequestsToMultiTable(
.setMaxConnectionsPerRegion(maxConnections)
.build());
ConnectionWorkerPool connectionWorkerPool =
- createConnectionWorkerPool(maxRequests, /*maxBytes=*/ 100000);
+ createConnectionWorkerPool(
+ maxRequests, /*maxBytes=*/ 100000, java.time.Duration.ofSeconds(5));
// Sets the sleep time to simulate requests stuck in connection.
testBigQueryWrite.setResponseSleep(Duration.ofMillis(50L));
@@ -206,7 +207,8 @@ public void testMultiStreamClosed_multiplexingEnabled() throws Exception {
ConnectionWorkerPool.setOptions(
Settings.builder().setMaxConnectionsPerRegion(10).setMinConnectionsPerRegion(5).build());
ConnectionWorkerPool connectionWorkerPool =
- createConnectionWorkerPool(/*maxRequests=*/ 3, /*maxBytes=*/ 1000);
+ createConnectionWorkerPool(
+ /*maxRequests=*/ 3, /*maxBytes=*/ 1000, java.time.Duration.ofSeconds(5));
// Sets the sleep time to simulate requests stuck in connection.
testBigQueryWrite.setResponseSleep(Duration.ofMillis(50L));
@@ -255,7 +257,8 @@ public void testMultiStreamAppend_appendWhileClosing() throws Exception {
ConnectionWorkerPool.setOptions(
Settings.builder().setMaxConnectionsPerRegion(10).setMinConnectionsPerRegion(5).build());
ConnectionWorkerPool connectionWorkerPool =
- createConnectionWorkerPool(/*maxRequests=*/ 3, /*maxBytes=*/ 100000);
+ createConnectionWorkerPool(
+ /*maxRequests=*/ 3, /*maxBytes=*/ 100000, java.time.Duration.ofSeconds(5));
// Sets the sleep time to simulate requests stuck in connection.
testBigQueryWrite.setResponseSleep(Duration.ofMillis(50L));
@@ -368,11 +371,13 @@ private ProtoRows createProtoRows(String[] messages) {
return rowsBuilder.build();
}
- ConnectionWorkerPool createConnectionWorkerPool(long maxRequests, long maxBytes) {
+ ConnectionWorkerPool createConnectionWorkerPool(
+ long maxRequests, long maxBytes, java.time.Duration maxRetryDuration) {
ConnectionWorkerPool.enableTestingLogic();
return new ConnectionWorkerPool(
maxRequests,
maxBytes,
+ maxRetryDuration,
FlowController.LimitExceededBehavior.Block,
TEST_TRACE_ID,
client,
diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerTest.java
index a2258ad430..8db4b072b1 100644
--- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerTest.java
+++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerTest.java
@@ -290,16 +290,23 @@ private AppendRowsResponse createAppendResponse(long offset) {
private ConnectionWorker createConnectionWorker() throws IOException {
// By default use only the first table as table reference.
- return createConnectionWorker(TEST_STREAM_1, TEST_TRACE_ID, 100, 1000);
+ return createConnectionWorker(
+ TEST_STREAM_1, TEST_TRACE_ID, 100, 1000, java.time.Duration.ofSeconds(5));
}
private ConnectionWorker createConnectionWorker(
- String streamName, String traceId, long maxRequests, long maxBytes) throws IOException {
+ String streamName,
+ String traceId,
+ long maxRequests,
+ long maxBytes,
+ java.time.Duration maxRetryDuration)
+ throws IOException {
return new ConnectionWorker(
streamName,
createProtoSchema("foo"),
maxRequests,
maxBytes,
+ maxRetryDuration,
FlowController.LimitExceededBehavior.Block,
TEST_TRACE_ID,
client,
diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/FakeBigQueryWrite.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/FakeBigQueryWrite.java
index 5ba2f2aa1e..d707bbf976 100644
--- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/FakeBigQueryWrite.java
+++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/FakeBigQueryWrite.java
@@ -91,6 +91,10 @@ public void setTimesToClose(long numberTimesToClose) {
serviceImpl.setTimesToClose(numberTimesToClose);
}
+ public void setCloseForeverAfter(long closeForeverAfter) {
+ serviceImpl.setCloseForeverAfter(closeForeverAfter);
+ }
+
public long getConnectionCount() {
return serviceImpl.getConnectionCount();
}
diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/FakeBigQueryWriteImpl.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/FakeBigQueryWriteImpl.java
index 02223ace82..db900100ad 100644
--- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/FakeBigQueryWriteImpl.java
+++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/FakeBigQueryWriteImpl.java
@@ -57,6 +57,7 @@ class FakeBigQueryWriteImpl extends BigQueryWriteGrpc.BigQueryWriteImplBase {
private long closeAfter = 0;
private long recordCount = 0;
private long connectionCount = 0;
+ private long closeForeverAfter = 0;
// Record whether the first record has been seen on a connection.
private final Map, Boolean> connectionToFirstRequest =
@@ -177,6 +178,9 @@ public void onNext(AppendRowsRequest value) {
&& (numberTimesToClose == 0 || connectionCount <= numberTimesToClose)) {
LOG.info("Shutting down connection from test...");
responseObserver.onError(Status.ABORTED.asException());
+ } else if (closeForeverAfter > 0 && recordCount > closeForeverAfter) {
+ LOG.info("Shutting down connection from test...");
+ responseObserver.onError(Status.ABORTED.asException());
} else {
final Response response = responses.get(offset);
sendResponse(response, responseObserver);
@@ -279,4 +283,10 @@ public void setCloseEveryNAppends(long closeAfter) {
public void setTimesToClose(long numberTimesToClose) {
this.numberTimesToClose = numberTimesToClose;
}
+
+ /* The connection will forever return failure after numberTimesToClose. This option shouldn't
+ * be used together with setCloseEveryNAppends and setTimesToClose*/
+ public void setCloseForeverAfter(long closeForeverAfter) {
+ this.closeForeverAfter = closeForeverAfter;
+ }
}
diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/StreamWriterTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/StreamWriterTest.java
index 50e43fe45d..f8822e231f 100644
--- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/StreamWriterTest.java
+++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/StreamWriterTest.java
@@ -25,6 +25,7 @@
import com.google.api.gax.core.NoCredentialsProvider;
import com.google.api.gax.grpc.testing.MockGrpcService;
import com.google.api.gax.grpc.testing.MockServiceHelper;
+import com.google.api.gax.rpc.AbortedException;
import com.google.api.gax.rpc.ApiException;
import com.google.api.gax.rpc.StatusCode.Code;
import com.google.api.gax.rpc.UnknownException;
@@ -129,6 +130,7 @@ private StreamWriter getMultiplexingTestStreamWriter() throws IOException {
.setTraceId(TEST_TRACE_ID)
.setLocation("US")
.setEnableConnectionPool(true)
+ .setMaxRetryDuration(java.time.Duration.ofSeconds(5))
.build();
}
@@ -136,6 +138,7 @@ private StreamWriter getTestStreamWriter() throws IOException {
return StreamWriter.newBuilder(TEST_STREAM_1, client)
.setWriterSchema(createProtoSchema())
.setTraceId(TEST_TRACE_ID)
+ .setMaxRetryDuration(java.time.Duration.ofSeconds(5))
.build();
}
@@ -884,6 +887,48 @@ public void testAppendWithResetSuccess() throws Exception {
}
}
+ @Test
+ public void testAppendWithResetNeverSuccess() throws Exception {
+ try (StreamWriter writer = getTestStreamWriter()) {
+ testBigQueryWrite.setCloseForeverAfter(1);
+ long appendCount = 100;
+ for (long i = 0; i < appendCount; i++) {
+ testBigQueryWrite.addResponse(createAppendResponse(i));
+ }
+ List> futures = new ArrayList<>();
+ for (long i = 0; i < appendCount; i++) {
+ futures.add(sendTestMessage(writer, new String[] {String.valueOf(i)}, i));
+ }
+ // first request succeeded.
+ assertEquals(futures.get(0).get().getAppendResult().getOffset().getValue(), 0);
+ // after 5 seconds, the requests will bail out.
+ for (int i = 1; i < appendCount; i++) {
+ assertFutureException(AbortedException.class, futures.get(i));
+ }
+ }
+ }
+
+ @Test
+ public void testAppendWithResetNeverSuccessWithMultiplexing() throws Exception {
+ try (StreamWriter writer = getMultiplexingTestStreamWriter()) {
+ testBigQueryWrite.setCloseForeverAfter(1);
+ long appendCount = 100;
+ for (long i = 0; i < appendCount; i++) {
+ testBigQueryWrite.addResponse(createAppendResponse(i));
+ }
+ List> futures = new ArrayList<>();
+ for (long i = 0; i < appendCount; i++) {
+ futures.add(sendTestMessage(writer, new String[] {String.valueOf(i)}, i));
+ }
+ // first request succeeded.
+ assertEquals(futures.get(0).get().getAppendResult().getOffset().getValue(), 0);
+ // after 5 seconds, the requests will bail out.
+ for (int i = 1; i < appendCount; i++) {
+ assertFutureException(AbortedException.class, futures.get(i));
+ }
+ }
+ }
+
// This test is setup for the server to force a retry after all records are sent. Ensure the
// records are resent, even if no new records are appeneded.
@Test