From cb2844e36058c7c6f8cb19842e2397394cf5dcd7 Mon Sep 17 00:00:00 2001 From: Yiru Tang Date: Fri, 3 Feb 2023 19:41:52 +0000 Subject: [PATCH 1/9] feat: Add userClose flag back to StreamWriter --- .../bigquery/storage/v1/ConnectionWorker.java | 9 +++++ .../bigquery/storage/v1/StreamWriter.java | 35 +++++++++++++++++++ .../bigquery/storage/v1/StreamWriterTest.java | 2 ++ 3 files changed, 46 insertions(+) diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorker.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorker.java index 8ca9304fe1..bb70159213 100644 --- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorker.java +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorker.java @@ -273,6 +273,15 @@ ApiFuture append( return appendInternal(requestBuilder.build()); } + Boolean isClosed() { + this.lock.lock(); + try { + return userClosed; + } finally { + this.lock.unlock(); + } + } + private ApiFuture appendInternal(AppendRowsRequest message) { AppendRequestAndResponse requestWrapper = new AppendRequestAndResponse(message); if (requestWrapper.messageSize > getApiMaxRequestBytes()) { diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/StreamWriter.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/StreamWriter.java index 337ff86a66..1bcaff52d1 100644 --- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/StreamWriter.java +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/StreamWriter.java @@ -23,6 +23,7 @@ import com.google.auto.value.AutoOneOf; import com.google.auto.value.AutoValue; import com.google.cloud.bigquery.storage.v1.ConnectionWorker.TableSchemaAndTimestamp; +import com.google.cloud.bigquery.storage.v1.StreamWriter.SingleConnectionOrConnectionPool.Kind; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import io.grpc.Status; @@ -36,6 +37,7 @@ import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Lock; import java.util.logging.Logger; import java.util.regex.Matcher; import java.util.regex.Pattern; @@ -70,6 +72,11 @@ public class StreamWriter implements AutoCloseable { */ private final String location; + /* + * If user has closed the StreamWriter. + */ + private boolean userClose; + /* * A String that uniquely identifies this writer. */ @@ -94,6 +101,8 @@ public class StreamWriter implements AutoCloseable { /** Creation timestamp of this streamwriter */ private final long creationTimestamp; + private Lock lock; + /** The maximum size of one request. Defined by the API. */ public static long getApiMaxRequestBytes() { return 10L * 1000L * 1000L; // 10 megabytes (https://en.wikipedia.org/wiki/Megabyte) @@ -142,6 +151,12 @@ public ApiFuture append( } public void close(StreamWriter streamWriter) { + this.lock.lock(); + try { + this.userClosed = true; + } finally { + this.lock.unlock(); + } if (getKind() == Kind.CONNECTION_WORKER) { connectionWorker().close(); } else { @@ -363,6 +378,7 @@ public ApiFuture append(ProtoRows rows) { * @return the append response wrapped in a future. */ public ApiFuture append(ProtoRows rows, long offset) { + this. return this.singleConnectionOrConnectionPool.append(this, rows, offset); } @@ -398,6 +414,25 @@ public String getLocation() { return location; } + /** @return if a stream writer can no longer be used for writing. It is due to either the + * StreamWriter is explicitly closed or the underlying connection is broken when connection pool + * is not used. Client should recreate StreamWriter in this case. + */ + public Boolean isDone() { + this.lock.lock(); + try { + if (singleConnectionOrConnectionPool.getKind() == Kind.CONNECTION_WORKER) { + return userClose || singleConnectionOrConnectionPool.connectionWorker() + .isConnectionInUnrecoverableState(); + } else { + // With ConnectionPool, we will replace the bad connection automatically. + return userClose; + } + } finally { + this.lock.unlock(); + } + } + /** Close the stream writer. Shut down all resources. */ @Override public void close() { diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/StreamWriterTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/StreamWriterTest.java index ce7f233af6..810d1644cf 100644 --- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/StreamWriterTest.java +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/StreamWriterTest.java @@ -54,6 +54,7 @@ import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.concurrent.locks.Lock; import java.util.logging.Logger; import org.junit.After; import org.junit.Assert; @@ -100,6 +101,7 @@ public class StreamWriterTest { ProtoSchemaConverter.convert( BQTableSchemaToProtoDescriptor.convertBQTableSchemaToProtoDescriptor( UPDATED_TABLE_SCHEMA)); + private Lock lock; public StreamWriterTest() throws DescriptorValidationException {} From 22ff342bc0638f75aa42955d2484c1d7e502dd18 Mon Sep 17 00:00:00 2001 From: Yiru Tang Date: Fri, 3 Feb 2023 21:00:14 +0000 Subject: [PATCH 2/9] . --- .../bigquery/storage/v1/ConnectionWorker.java | 13 ++++-- .../bigquery/storage/v1/StreamWriter.java | 45 ++++++++++--------- 2 files changed, 32 insertions(+), 26 deletions(-) diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorker.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorker.java index bb70159213..c1f5e78a90 100644 --- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorker.java +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorker.java @@ -273,7 +273,7 @@ ApiFuture append( return appendInternal(requestBuilder.build()); } - Boolean isClosed() { + Boolean isUserClosed() { this.lock.lock(); try { return userClosed; @@ -387,8 +387,13 @@ public String getWriterId() { } boolean isConnectionInUnrecoverableState() { - // If final status is set, there's no - return connectionFinalStatus != null; + this.lock.lock(); + try { + // If final status is set, there's no + return connectionFinalStatus != null; + } finally { + this.lock.unlock(); + } } /** Close the stream writer. Shut down all resources. */ @@ -802,7 +807,7 @@ synchronized TableSchemaAndTimestamp getUpdatedSchema() { } // Class that wraps AppendRowsRequest and its corresponding Response future. - private static final class AppendRequestAndResponse { + static final class AppendRequestAndResponse { final SettableApiFuture appendResult; final AppendRowsRequest message; final long messageSize; diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/StreamWriter.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/StreamWriter.java index 1bcaff52d1..208c88c5d5 100644 --- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/StreamWriter.java +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/StreamWriter.java @@ -22,6 +22,7 @@ import com.google.api.gax.rpc.TransportChannelProvider; import com.google.auto.value.AutoOneOf; import com.google.auto.value.AutoValue; +import com.google.cloud.bigquery.storage.v1.ConnectionWorker.AppendRequestAndResponse; import com.google.cloud.bigquery.storage.v1.ConnectionWorker.TableSchemaAndTimestamp; import com.google.cloud.bigquery.storage.v1.StreamWriter.SingleConnectionOrConnectionPool.Kind; import com.google.common.annotations.VisibleForTesting; @@ -37,6 +38,7 @@ import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.locks.Lock; import java.util.logging.Logger; import java.util.regex.Matcher; @@ -75,7 +77,7 @@ public class StreamWriter implements AutoCloseable { /* * If user has closed the StreamWriter. */ - private boolean userClose; + private AtomicBoolean userClosed; /* * A String that uniquely identifies this writer. @@ -151,12 +153,7 @@ public ApiFuture append( } public void close(StreamWriter streamWriter) { - this.lock.lock(); - try { - this.userClosed = true; - } finally { - this.lock.unlock(); - } + this.userClosed.set(true); if (getKind() == Kind.CONNECTION_WORKER) { connectionWorker().close(); } else { @@ -378,7 +375,15 @@ public ApiFuture append(ProtoRows rows) { * @return the append response wrapped in a future. */ public ApiFuture append(ProtoRows rows, long offset) { - this. + if (userClosed.get()) { + AppendRequestAndResponse requestWrapper = + new AppendRequestAndResponse(AppendRowsRequest.newBuilder().build()); + requestWrapper.appendResult.setException( + new StatusRuntimeException( + Status.fromCode(Code.FAILED_PRECONDITION) + .withDescription("User Closed streamWriter"))); + return requestWrapper.appendResult; + } return this.singleConnectionOrConnectionPool.append(this, rows, offset); } @@ -414,22 +419,18 @@ public String getLocation() { return location; } - /** @return if a stream writer can no longer be used for writing. It is due to either the - * StreamWriter is explicitly closed or the underlying connection is broken when connection pool - * is not used. Client should recreate StreamWriter in this case. + /** + * @return if a stream writer can no longer be used for writing. It is due to either the + * StreamWriter is explicitly closed or the underlying connection is broken when connection + * pool is not used. Client should recreate StreamWriter in this case. */ public Boolean isDone() { - this.lock.lock(); - try { - if (singleConnectionOrConnectionPool.getKind() == Kind.CONNECTION_WORKER) { - return userClose || singleConnectionOrConnectionPool.connectionWorker() - .isConnectionInUnrecoverableState(); - } else { - // With ConnectionPool, we will replace the bad connection automatically. - return userClose; - } - } finally { - this.lock.unlock(); + if (singleConnectionOrConnectionPool.getKind() == Kind.CONNECTION_WORKER) { + return userClosed.get() + || singleConnectionOrConnectionPool.connectionWorker().isConnectionInUnrecoverableState(); + } else { + // With ConnectionPool, we will replace the bad connection automatically. + return userClosed.get(); } } From f789ec84da7bc4c684e177684e7d8629e2976730 Mon Sep 17 00:00:00 2001 From: Yiru Tang Date: Fri, 3 Feb 2023 21:50:11 +0000 Subject: [PATCH 3/9] . --- .../storage/v1/ConnectionWorkerPool.java | 2 +- .../bigquery/storage/v1/StreamWriter.java | 6 +- .../storage/v1/FakeBigQueryWrite.java | 5 ++ .../storage/v1/FakeBigQueryWriteImpl.java | 9 +- .../bigquery/storage/v1/StreamWriterTest.java | 86 ++++++++++++++++++- 5 files changed, 100 insertions(+), 8 deletions(-) diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerPool.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerPool.java index c4e68bb189..fa2729aad9 100644 --- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerPool.java +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerPool.java @@ -379,7 +379,7 @@ private ConnectionWorker createConnectionWorker(String streamName, ProtoSchema w connectionWorkerPool.add(connectionWorker); log.info( String.format( - "Scaling up new connection for stream name: %s, pool size after scaling up %s", + "Scaling up new connection for stream name: %s, pool size after scaling up %d", streamName, connectionWorkerPool.size())); return connectionWorker; } diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/StreamWriter.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/StreamWriter.java index 208c88c5d5..eb79d8d79b 100644 --- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/StreamWriter.java +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/StreamWriter.java @@ -77,7 +77,7 @@ public class StreamWriter implements AutoCloseable { /* * If user has closed the StreamWriter. */ - private AtomicBoolean userClosed; + private AtomicBoolean userClosed = new AtomicBoolean(false); /* * A String that uniquely identifies this writer. @@ -153,7 +153,6 @@ public ApiFuture append( } public void close(StreamWriter streamWriter) { - this.userClosed.set(true); if (getKind() == Kind.CONNECTION_WORKER) { connectionWorker().close(); } else { @@ -424,7 +423,7 @@ public String getLocation() { * StreamWriter is explicitly closed or the underlying connection is broken when connection * pool is not used. Client should recreate StreamWriter in this case. */ - public Boolean isDone() { + public boolean isDone() { if (singleConnectionOrConnectionPool.getKind() == Kind.CONNECTION_WORKER) { return userClosed.get() || singleConnectionOrConnectionPool.connectionWorker().isConnectionInUnrecoverableState(); @@ -437,6 +436,7 @@ public Boolean isDone() { /** Close the stream writer. Shut down all resources. */ @Override public void close() { + userClosed.set(true); singleConnectionOrConnectionPool.close(this); } diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/FakeBigQueryWrite.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/FakeBigQueryWrite.java index d707bbf976..5f697185f1 100644 --- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/FakeBigQueryWrite.java +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/FakeBigQueryWrite.java @@ -18,6 +18,7 @@ import com.google.api.gax.grpc.testing.MockGrpcService; import com.google.protobuf.AbstractMessage; import io.grpc.ServerServiceDefinition; +import io.grpc.Status; import java.util.LinkedList; import java.util.List; import java.util.concurrent.ScheduledExecutorService; @@ -102,4 +103,8 @@ public long getConnectionCount() { public void setExecutor(ScheduledExecutorService executor) { serviceImpl.setExecutor(executor); } + + public void setFailedStatus(Status failedStatus) { + serviceImpl.setFailedStatus(failedStatus); + } } diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/FakeBigQueryWriteImpl.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/FakeBigQueryWriteImpl.java index db900100ad..e406fb03b6 100644 --- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/FakeBigQueryWriteImpl.java +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/FakeBigQueryWriteImpl.java @@ -62,6 +62,7 @@ class FakeBigQueryWriteImpl extends BigQueryWriteGrpc.BigQueryWriteImplBase { // Record whether the first record has been seen on a connection. private final Map, Boolean> connectionToFirstRequest = new ConcurrentHashMap<>(); + private Status failedStatus = Status.ABORTED; /** Class used to save the state of a possible response. */ private static class Response { @@ -138,6 +139,10 @@ public long getConnectionCount() { return connectionCount; } + public void setFailedStatus(Status failedStatus) { + this.failedStatus = failedStatus; + } + @Override public StreamObserver appendRows( final StreamObserver responseObserver) { @@ -177,10 +182,10 @@ public void onNext(AppendRowsRequest value) { && recordCount % closeAfter == 0 && (numberTimesToClose == 0 || connectionCount <= numberTimesToClose)) { LOG.info("Shutting down connection from test..."); - responseObserver.onError(Status.ABORTED.asException()); + responseObserver.onError(failedStatus.asException()); } else if (closeForeverAfter > 0 && recordCount > closeForeverAfter) { LOG.info("Shutting down connection from test..."); - responseObserver.onError(Status.ABORTED.asException()); + responseObserver.onError(failedStatus.asException()); } else { final Response response = responses.get(offset); sendResponse(response, responseObserver); diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/StreamWriterTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/StreamWriterTest.java index 810d1644cf..94bc26424b 100644 --- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/StreamWriterTest.java +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/StreamWriterTest.java @@ -54,7 +54,6 @@ import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; -import java.util.concurrent.locks.Lock; import java.util.logging.Logger; import org.junit.After; import org.junit.Assert; @@ -101,7 +100,6 @@ public class StreamWriterTest { ProtoSchemaConverter.convert( BQTableSchemaToProtoDescriptor.convertBQTableSchemaToProtoDescriptor( UPDATED_TABLE_SCHEMA)); - private Lock lock; public StreamWriterTest() throws DescriptorValidationException {} @@ -1227,4 +1225,88 @@ public void testCloseDisconnectedStream() throws Exception { // Ensure closing the writer after disconnect succeeds. writer.close(); } + + @Test(timeout = 10000) + public void testStreamWriterUserCloseMultiplexing() throws Exception { + StreamWriter writer = + StreamWriter.newBuilder(TEST_STREAM_1) + .setWriterSchema(createProtoSchema()) + .setEnableConnectionPool(true) + .setLocation("us") + .build(); + + writer.close(); + assertTrue(writer.isDone()); + ApiFuture appendFuture1 = sendTestMessage(writer, new String[] {"A"}); + ExecutionException ex = + assertThrows( + ExecutionException.class, + () -> { + appendFuture1.get(); + }); + assertEquals( + Status.Code.FAILED_PRECONDITION, + ((StatusRuntimeException) ex.getCause()).getStatus().getCode()); + } + + @Test(timeout = 10000) + public void testStreamWriterUserCloseNoMultiplexing() throws Exception { + StreamWriter writer = + StreamWriter.newBuilder(TEST_STREAM_1).setWriterSchema(createProtoSchema()).build(); + + writer.close(); + assertTrue(writer.isDone()); + ApiFuture appendFuture1 = sendTestMessage(writer, new String[] {"A"}); + ExecutionException ex = + assertThrows( + ExecutionException.class, + () -> { + appendFuture1.get(); + }); + assertEquals( + Status.Code.FAILED_PRECONDITION, + ((StatusRuntimeException) ex.getCause()).getStatus().getCode()); + } + + @Test(timeout = 10000) + public void testStreamWriterPermanentErrorMultiplexing() throws Exception { + StreamWriter writer = + StreamWriter.newBuilder(TEST_STREAM_1, client) + .setWriterSchema(createProtoSchema()) + .setEnableConnectionPool(true) + .setLocation("us") + .build(); + testBigQueryWrite.setCloseForeverAfter(1); + // Permenant errror. + testBigQueryWrite.setFailedStatus(Status.INVALID_ARGUMENT); + testBigQueryWrite.addResponse(createAppendResponse(0)); + ApiFuture appendFuture1 = sendTestMessage(writer, new String[] {"A"}); + appendFuture1.get(); + ApiFuture appendFuture2 = sendTestMessage(writer, new String[] {"A"}); + assertThrows( + ExecutionException.class, + () -> { + appendFuture2.get(); + }); + assertFalse(writer.isDone()); + } + + @Test(timeout = 10000) + public void testStreamWriterPermanentErrorNoMultiplexing() throws Exception { + StreamWriter writer = + StreamWriter.newBuilder(TEST_STREAM_1, client).setWriterSchema(createProtoSchema()).build(); + testBigQueryWrite.setCloseForeverAfter(1); + // Permenant errror. + testBigQueryWrite.setFailedStatus(Status.INVALID_ARGUMENT); + testBigQueryWrite.addResponse(createAppendResponse(0)); + ApiFuture appendFuture1 = sendTestMessage(writer, new String[] {"A"}); + appendFuture1.get(); + ApiFuture appendFuture2 = sendTestMessage(writer, new String[] {"A"}); + assertThrows( + ExecutionException.class, + () -> { + appendFuture2.get(); + }); + assertTrue(writer.isDone()); + } } From ec4c76fa65b39c206f06ef372eeae49947bb7638 Mon Sep 17 00:00:00 2001 From: Yiru Tang Date: Fri, 3 Feb 2023 21:53:54 +0000 Subject: [PATCH 4/9] . --- .../java/com/google/cloud/bigquery/storage/v1/StreamWriter.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/StreamWriter.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/StreamWriter.java index eb79d8d79b..1320e0238a 100644 --- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/StreamWriter.java +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/StreamWriter.java @@ -380,7 +380,7 @@ public ApiFuture append(ProtoRows rows, long offset) { requestWrapper.appendResult.setException( new StatusRuntimeException( Status.fromCode(Code.FAILED_PRECONDITION) - .withDescription("User Closed streamWriter"))); + .withDescription("User slosed StreamWriter"))); return requestWrapper.appendResult; } return this.singleConnectionOrConnectionPool.append(this, rows, offset); From 3bbddcdd142c118715b9e1c7d91f5f71ada02fc8 Mon Sep 17 00:00:00 2001 From: Yiru Tang Date: Fri, 3 Feb 2023 21:59:38 +0000 Subject: [PATCH 5/9] . --- .../google/cloud/bigquery/storage/v1/StreamWriterTest.java | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/StreamWriterTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/StreamWriterTest.java index 94bc26424b..27c1216ac0 100644 --- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/StreamWriterTest.java +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/StreamWriterTest.java @@ -30,6 +30,7 @@ import com.google.api.gax.grpc.testing.MockServiceHelper; import com.google.api.gax.rpc.AbortedException; import com.google.api.gax.rpc.ApiException; +import com.google.api.gax.rpc.InvalidArgumentException; import com.google.api.gax.rpc.StatusCode.Code; import com.google.api.gax.rpc.UnknownException; import com.google.cloud.bigquery.storage.test.Test.FooType; @@ -1283,11 +1284,12 @@ public void testStreamWriterPermanentErrorMultiplexing() throws Exception { ApiFuture appendFuture1 = sendTestMessage(writer, new String[] {"A"}); appendFuture1.get(); ApiFuture appendFuture2 = sendTestMessage(writer, new String[] {"A"}); - assertThrows( + ExecutionException ex = assertThrows( ExecutionException.class, () -> { appendFuture2.get(); }); + assertTrue(ex.getCause() instanceof InvalidArgumentException); assertFalse(writer.isDone()); } @@ -1302,11 +1304,12 @@ public void testStreamWriterPermanentErrorNoMultiplexing() throws Exception { ApiFuture appendFuture1 = sendTestMessage(writer, new String[] {"A"}); appendFuture1.get(); ApiFuture appendFuture2 = sendTestMessage(writer, new String[] {"A"}); - assertThrows( + ExecutionException ex = assertThrows( ExecutionException.class, () -> { appendFuture2.get(); }); assertTrue(writer.isDone()); + assertTrue(ex.getCause() instanceof InvalidArgumentException); } } From f5f7feecdcfcf5e202045aabf0875cca37838361 Mon Sep 17 00:00:00 2001 From: Yiru Tang Date: Fri, 3 Feb 2023 22:07:10 +0000 Subject: [PATCH 6/9] . --- .../bigquery/storage/v1/StreamWriterTest.java | 22 ++++++++++--------- 1 file changed, 12 insertions(+), 10 deletions(-) diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/StreamWriterTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/StreamWriterTest.java index 27c1216ac0..5542a7a2ea 100644 --- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/StreamWriterTest.java +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/StreamWriterTest.java @@ -1284,11 +1284,12 @@ public void testStreamWriterPermanentErrorMultiplexing() throws Exception { ApiFuture appendFuture1 = sendTestMessage(writer, new String[] {"A"}); appendFuture1.get(); ApiFuture appendFuture2 = sendTestMessage(writer, new String[] {"A"}); - ExecutionException ex = assertThrows( - ExecutionException.class, - () -> { - appendFuture2.get(); - }); + ExecutionException ex = + assertThrows( + ExecutionException.class, + () -> { + appendFuture2.get(); + }); assertTrue(ex.getCause() instanceof InvalidArgumentException); assertFalse(writer.isDone()); } @@ -1304,11 +1305,12 @@ public void testStreamWriterPermanentErrorNoMultiplexing() throws Exception { ApiFuture appendFuture1 = sendTestMessage(writer, new String[] {"A"}); appendFuture1.get(); ApiFuture appendFuture2 = sendTestMessage(writer, new String[] {"A"}); - ExecutionException ex = assertThrows( - ExecutionException.class, - () -> { - appendFuture2.get(); - }); + ExecutionException ex = + assertThrows( + ExecutionException.class, + () -> { + appendFuture2.get(); + }); assertTrue(writer.isDone()); assertTrue(ex.getCause() instanceof InvalidArgumentException); } From 4c4f772de4db186befb5d6c0e41bc360576574ba Mon Sep 17 00:00:00 2001 From: Yiru Tang Date: Fri, 3 Feb 2023 22:27:44 +0000 Subject: [PATCH 7/9] fix test failure --- .../google/cloud/bigquery/storage/v1/StreamWriter.java | 9 ++++++--- .../cloud/bigquery/storage/v1/StreamWriterTest.java | 2 +- 2 files changed, 7 insertions(+), 4 deletions(-) diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/StreamWriter.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/StreamWriter.java index 1320e0238a..1b9f61b583 100644 --- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/StreamWriter.java +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/StreamWriter.java @@ -24,6 +24,7 @@ import com.google.auto.value.AutoValue; import com.google.cloud.bigquery.storage.v1.ConnectionWorker.AppendRequestAndResponse; import com.google.cloud.bigquery.storage.v1.ConnectionWorker.TableSchemaAndTimestamp; +import com.google.cloud.bigquery.storage.v1.Exceptions.StreamWriterClosedException; import com.google.cloud.bigquery.storage.v1.StreamWriter.SingleConnectionOrConnectionPool.Kind; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; @@ -378,9 +379,11 @@ public ApiFuture append(ProtoRows rows, long offset) { AppendRequestAndResponse requestWrapper = new AppendRequestAndResponse(AppendRowsRequest.newBuilder().build()); requestWrapper.appendResult.setException( - new StatusRuntimeException( - Status.fromCode(Code.FAILED_PRECONDITION) - .withDescription("User slosed StreamWriter"))); + new Exceptions.StreamWriterClosedException( + Status.fromCode(Status.Code.FAILED_PRECONDITION) + .withDescription("User closed StreamWriter"), + streamName, + getWriterId())); return requestWrapper.appendResult; } return this.singleConnectionOrConnectionPool.append(this, rows, offset); diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/StreamWriterTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/StreamWriterTest.java index 5542a7a2ea..6dfbec2d33 100644 --- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/StreamWriterTest.java +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/StreamWriterTest.java @@ -1038,7 +1038,7 @@ public void testWriterAlreadyClosedException() throws Exception { // The basic StatusRuntimeException API is not changed. assertTrue(actualError instanceof StatusRuntimeException); assertEquals(Status.Code.FAILED_PRECONDITION, actualError.getStatus().getCode()); - assertTrue(actualError.getStatus().getDescription().contains("Connection is already closed")); + assertTrue(actualError.getStatus().getDescription().contains("User closed StreamWriter")); assertEquals(actualError.getWriterId(), writer.getWriterId()); assertEquals(actualError.getStreamName(), writer.getStreamName()); } From 194dd8ce1f02c406da3b38b4947c1c905d7e3c7f Mon Sep 17 00:00:00 2001 From: Yiru Tang Date: Mon, 6 Feb 2023 19:49:02 +0000 Subject: [PATCH 8/9] . --- .../com/google/cloud/bigquery/storage/v1/StreamWriter.java | 1 - .../google/cloud/bigquery/storage/v1/StreamWriterTest.java | 4 ++-- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/StreamWriter.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/StreamWriter.java index 1b9f61b583..e09467981c 100644 --- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/StreamWriter.java +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/StreamWriter.java @@ -24,7 +24,6 @@ import com.google.auto.value.AutoValue; import com.google.cloud.bigquery.storage.v1.ConnectionWorker.AppendRequestAndResponse; import com.google.cloud.bigquery.storage.v1.ConnectionWorker.TableSchemaAndTimestamp; -import com.google.cloud.bigquery.storage.v1.Exceptions.StreamWriterClosedException; import com.google.cloud.bigquery.storage.v1.StreamWriter.SingleConnectionOrConnectionPool.Kind; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/StreamWriterTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/StreamWriterTest.java index 6dfbec2d33..731d6d6364 100644 --- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/StreamWriterTest.java +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/StreamWriterTest.java @@ -1230,7 +1230,7 @@ public void testCloseDisconnectedStream() throws Exception { @Test(timeout = 10000) public void testStreamWriterUserCloseMultiplexing() throws Exception { StreamWriter writer = - StreamWriter.newBuilder(TEST_STREAM_1) + StreamWriter.newBuilder(TEST_STREAM_1, client) .setWriterSchema(createProtoSchema()) .setEnableConnectionPool(true) .setLocation("us") @@ -1253,7 +1253,7 @@ public void testStreamWriterUserCloseMultiplexing() throws Exception { @Test(timeout = 10000) public void testStreamWriterUserCloseNoMultiplexing() throws Exception { StreamWriter writer = - StreamWriter.newBuilder(TEST_STREAM_1).setWriterSchema(createProtoSchema()).build(); + StreamWriter.newBuilder(TEST_STREAM_1, client).setWriterSchema(createProtoSchema()).build(); writer.close(); assertTrue(writer.isDone()); From adad20a60e47399cbbec8f380d9e9d5dc487b01e Mon Sep 17 00:00:00 2001 From: Owl Bot Date: Mon, 6 Feb 2023 19:56:34 +0000 Subject: [PATCH 9/9] =?UTF-8?q?=F0=9F=A6=89=20Updates=20from=20OwlBot=20po?= =?UTF-8?q?st-processor?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md --- README.md | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/README.md b/README.md index 712bb3034e..908faab2c6 100644 --- a/README.md +++ b/README.md @@ -49,20 +49,20 @@ If you are using Maven without BOM, add this to your dependencies: If you are using Gradle 5.x or later, add this to your dependencies: ```Groovy -implementation platform('com.google.cloud:libraries-bom:26.5.0') +implementation platform('com.google.cloud:libraries-bom:26.6.0') implementation 'com.google.cloud:google-cloud-bigquerystorage' ``` If you are using Gradle without BOM, add this to your dependencies: ```Groovy -implementation 'com.google.cloud:google-cloud-bigquerystorage:2.28.4' +implementation 'com.google.cloud:google-cloud-bigquerystorage:2.29.0' ``` If you are using SBT, add this to your dependencies: ```Scala -libraryDependencies += "com.google.cloud" % "google-cloud-bigquerystorage" % "2.28.4" +libraryDependencies += "com.google.cloud" % "google-cloud-bigquerystorage" % "2.29.0" ``` ## Authentication