From 7c092d60021b1a9901d911cb2a7babab78c5b7f9 Mon Sep 17 00:00:00 2001 From: Yiru Tang Date: Tue, 7 Feb 2023 22:16:35 +0000 Subject: [PATCH 1/5] fix: refactor only, add StreamWriter to AppendRequestResponse, so that we could callback on StreamWriter to manage its close --- .../bigquery/storage/v1/ConnectionWorker.java | 19 +++++++---- .../storage/v1/ConnectionWorkerPool.java | 2 +- .../bigquery/storage/v1/StreamWriter.java | 4 +-- .../storage/v1/ConnectionWorkerTest.java | 34 +++++++++---------- 4 files changed, 31 insertions(+), 28 deletions(-) diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorker.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorker.java index 32f749c3f1..1fb385b3de 100644 --- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorker.java +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorker.java @@ -268,15 +268,15 @@ public void run(Throwable finalStatus) { /** Schedules the writing of rows at given offset. */ ApiFuture append( - String streamName, ProtoSchema writerSchema, ProtoRows rows, long offset) { + StreamWriter streamWriter, ProtoRows rows, long offset) { AppendRowsRequest.Builder requestBuilder = AppendRowsRequest.newBuilder(); requestBuilder.setProtoRows( - ProtoData.newBuilder().setWriterSchema(writerSchema).setRows(rows).build()); + ProtoData.newBuilder().setWriterSchema(streamWriter.getProtoSchema()).setRows(rows).build()); if (offset >= 0) { requestBuilder.setOffset(Int64Value.of(offset)); } - requestBuilder.setWriteStream(streamName); - return appendInternal(requestBuilder.build()); + requestBuilder.setWriteStream(streamWriter.getStreamName()); + return appendInternal(streamWriter, requestBuilder.build()); } Boolean isUserClosed() { @@ -288,8 +288,9 @@ Boolean isUserClosed() { } } - private ApiFuture appendInternal(AppendRowsRequest message) { - AppendRequestAndResponse requestWrapper = new AppendRequestAndResponse(message); + private ApiFuture appendInternal(StreamWriter streamWriter, + AppendRowsRequest message) { + AppendRequestAndResponse requestWrapper = new AppendRequestAndResponse(message, streamWriter); if (requestWrapper.messageSize > getApiMaxRequestBytes()) { requestWrapper.appendResult.setException( new StatusRuntimeException( @@ -840,10 +841,14 @@ static final class AppendRequestAndResponse { final AppendRowsRequest message; final long messageSize; - AppendRequestAndResponse(AppendRowsRequest message) { + // The writer that issues the call of the request. + final StreamWriter streamWriter; + + AppendRequestAndResponse(AppendRowsRequest message, StreamWriter streamWriter) { this.appendResult = SettableApiFuture.create(); this.message = message; this.messageSize = message.getProtoRows().getSerializedSize(); + this.streamWriter = streamWriter; } } diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerPool.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerPool.java index fa2729aad9..4a176c3001 100644 --- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerPool.java +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerPool.java @@ -265,7 +265,7 @@ ApiFuture append(StreamWriter streamWriter, ProtoRows rows, Stopwatch stopwatch = Stopwatch.createStarted(); ApiFuture responseFuture = connectionWorker.append( - streamWriter.getStreamName(), streamWriter.getProtoSchema(), rows, offset); + streamWriter, rows, offset); return ApiFutures.transform( responseFuture, // Add callback for update schema diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/StreamWriter.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/StreamWriter.java index e09467981c..6572537c71 100644 --- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/StreamWriter.java +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/StreamWriter.java @@ -146,7 +146,7 @@ public ApiFuture append( StreamWriter streamWriter, ProtoRows protoRows, long offset) { if (getKind() == Kind.CONNECTION_WORKER) { return connectionWorker() - .append(streamWriter.getStreamName(), streamWriter.getProtoSchema(), protoRows, offset); + .append(streamWriter, protoRows, offset); } else { return connectionWorkerPool().append(streamWriter, protoRows, offset); } @@ -376,7 +376,7 @@ public ApiFuture append(ProtoRows rows) { public ApiFuture append(ProtoRows rows, long offset) { if (userClosed.get()) { AppendRequestAndResponse requestWrapper = - new AppendRequestAndResponse(AppendRowsRequest.newBuilder().build()); + new AppendRequestAndResponse(AppendRowsRequest.newBuilder().build(), this); requestWrapper.appendResult.setException( new Exceptions.StreamWriterClosedException( Status.fromCode(Status.Code.FAILED_PRECONDITION) diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerTest.java index 6cc3247279..abc0adc353 100644 --- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerTest.java +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerTest.java @@ -80,6 +80,8 @@ public void testMultiplexedAppendSuccess() throws Exception { testBigQueryWrite.addResponse(createAppendResponse(i)); } List> futures = new ArrayList<>(); + StreamWriter sw1 = StreamWriter.newBuilder(TEST_STREAM_1).setWriterSchema(createProtoSchema("foo")).build(); + StreamWriter sw2 = StreamWriter.newBuilder(TEST_STREAM_2).setWriterSchema(createProtoSchema("complicate")).build(); // We do a pattern of: // send to stream1, string1 // send to stream1, string2 @@ -95,8 +97,7 @@ public void testMultiplexedAppendSuccess() throws Exception { futures.add( sendTestMessage( connectionWorker, - TEST_STREAM_1, - createProtoSchema("foo"), + sw1, createFooProtoRows(new String[] {String.valueOf(i)}), i)); break; @@ -105,8 +106,7 @@ public void testMultiplexedAppendSuccess() throws Exception { futures.add( sendTestMessage( connectionWorker, - TEST_STREAM_2, - createProtoSchema("complicate"), + sw2, createComplicateTypeProtoRows(new String[] {String.valueOf(i)}), i)); break; @@ -197,14 +197,16 @@ public void testAppendInSameStream_switchSchema() throws Exception { // send to stream1, schema3 // send to stream1, schema1 // ... + StreamWriter sw1 = StreamWriter.newBuilder(TEST_STREAM_1).setWriterSchema(schema1).build(); + StreamWriter sw2 = StreamWriter.newBuilder(TEST_STREAM_1).setWriterSchema(schema2).build(); + StreamWriter sw3 = StreamWriter.newBuilder(TEST_STREAM_1).setWriterSchema(schema3).build(); for (long i = 0; i < appendCount; i++) { switch ((int) i % 4) { case 0: futures.add( sendTestMessage( connectionWorker, - TEST_STREAM_1, - schema1, + sw1, createFooProtoRows(new String[] {String.valueOf(i)}), i)); break; @@ -212,8 +214,7 @@ public void testAppendInSameStream_switchSchema() throws Exception { futures.add( sendTestMessage( connectionWorker, - TEST_STREAM_1, - schema2, + sw2, createFooProtoRows(new String[] {String.valueOf(i)}), i)); break; @@ -222,8 +223,7 @@ public void testAppendInSameStream_switchSchema() throws Exception { futures.add( sendTestMessage( connectionWorker, - TEST_STREAM_1, - schema3, + sw3, createFooProtoRows(new String[] {String.valueOf(i)}), i)); break; @@ -293,6 +293,8 @@ public void testAppendInSameStream_switchSchema() throws Exception { @Test public void testAppendButInflightQueueFull() throws Exception { + ProtoSchema schema1 = createProtoSchema("foo"); + StreamWriter sw1 = StreamWriter.newBuilder(TEST_STREAM_1).setWriterSchema(schema1).build(); ConnectionWorker connectionWorker = new ConnectionWorker( TEST_STREAM_1, @@ -305,7 +307,6 @@ public void testAppendButInflightQueueFull() throws Exception { client.getSettings()); testBigQueryWrite.setResponseSleep(org.threeten.bp.Duration.ofSeconds(1)); ConnectionWorker.setMaxInflightQueueWaitTime(500); - ProtoSchema schema1 = createProtoSchema("foo"); long appendCount = 6; for (int i = 0; i < appendCount; i++) { @@ -323,8 +324,7 @@ public void testAppendButInflightQueueFull() throws Exception { () -> { sendTestMessage( connectionWorker, - TEST_STREAM_1, - schema1, + sw1, createFooProtoRows(new String[] {String.valueOf(5)}), 5); }); @@ -335,8 +335,7 @@ public void testAppendButInflightQueueFull() throws Exception { futures.add( sendTestMessage( connectionWorker, - TEST_STREAM_1, - schema1, + sw1, createFooProtoRows(new String[] {String.valueOf(i)}), i)); assertEquals(connectionWorker.getLoad().inFlightRequestsCount(), i + 1); @@ -396,11 +395,10 @@ private ProtoSchema createProtoSchema(String protoName) { private ApiFuture sendTestMessage( ConnectionWorker connectionWorker, - String streamName, - ProtoSchema protoSchema, + StreamWriter streamWriter, ProtoRows protoRows, long offset) { - return connectionWorker.append(streamName, protoSchema, protoRows, offset); + return connectionWorker.append(streamWriter, protoRows, offset); } private ProtoRows createFooProtoRows(String[] messages) { From b4b5c0c7182f9db4cd2fc53f5d85f941d7867cad Mon Sep 17 00:00:00 2001 From: Yiru Tang Date: Tue, 7 Feb 2023 22:17:20 +0000 Subject: [PATCH 2/5] . --- .../bigquery/storage/v1/ConnectionWorker.java | 12 +++++++----- .../storage/v1/ConnectionWorkerPool.java | 3 +-- .../bigquery/storage/v1/StreamWriter.java | 3 +-- .../storage/v1/ConnectionWorkerTest.java | 18 ++++++++---------- 4 files changed, 17 insertions(+), 19 deletions(-) diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorker.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorker.java index 1fb385b3de..f085e8f064 100644 --- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorker.java +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorker.java @@ -267,11 +267,13 @@ public void run(Throwable finalStatus) { } /** Schedules the writing of rows at given offset. */ - ApiFuture append( - StreamWriter streamWriter, ProtoRows rows, long offset) { + ApiFuture append(StreamWriter streamWriter, ProtoRows rows, long offset) { AppendRowsRequest.Builder requestBuilder = AppendRowsRequest.newBuilder(); requestBuilder.setProtoRows( - ProtoData.newBuilder().setWriterSchema(streamWriter.getProtoSchema()).setRows(rows).build()); + ProtoData.newBuilder() + .setWriterSchema(streamWriter.getProtoSchema()) + .setRows(rows) + .build()); if (offset >= 0) { requestBuilder.setOffset(Int64Value.of(offset)); } @@ -288,8 +290,8 @@ Boolean isUserClosed() { } } - private ApiFuture appendInternal(StreamWriter streamWriter, - AppendRowsRequest message) { + private ApiFuture appendInternal( + StreamWriter streamWriter, AppendRowsRequest message) { AppendRequestAndResponse requestWrapper = new AppendRequestAndResponse(message, streamWriter); if (requestWrapper.messageSize > getApiMaxRequestBytes()) { requestWrapper.appendResult.setException( diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerPool.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerPool.java index 4a176c3001..8fcb84165e 100644 --- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerPool.java +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerPool.java @@ -264,8 +264,7 @@ ApiFuture append(StreamWriter streamWriter, ProtoRows rows, } Stopwatch stopwatch = Stopwatch.createStarted(); ApiFuture responseFuture = - connectionWorker.append( - streamWriter, rows, offset); + connectionWorker.append(streamWriter, rows, offset); return ApiFutures.transform( responseFuture, // Add callback for update schema diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/StreamWriter.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/StreamWriter.java index 6572537c71..80e6def185 100644 --- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/StreamWriter.java +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/StreamWriter.java @@ -145,8 +145,7 @@ public enum Kind { public ApiFuture append( StreamWriter streamWriter, ProtoRows protoRows, long offset) { if (getKind() == Kind.CONNECTION_WORKER) { - return connectionWorker() - .append(streamWriter, protoRows, offset); + return connectionWorker().append(streamWriter, protoRows, offset); } else { return connectionWorkerPool().append(streamWriter, protoRows, offset); } diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerTest.java index abc0adc353..424a27d0a9 100644 --- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerTest.java +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerTest.java @@ -80,8 +80,12 @@ public void testMultiplexedAppendSuccess() throws Exception { testBigQueryWrite.addResponse(createAppendResponse(i)); } List> futures = new ArrayList<>(); - StreamWriter sw1 = StreamWriter.newBuilder(TEST_STREAM_1).setWriterSchema(createProtoSchema("foo")).build(); - StreamWriter sw2 = StreamWriter.newBuilder(TEST_STREAM_2).setWriterSchema(createProtoSchema("complicate")).build(); + StreamWriter sw1 = + StreamWriter.newBuilder(TEST_STREAM_1).setWriterSchema(createProtoSchema("foo")).build(); + StreamWriter sw2 = + StreamWriter.newBuilder(TEST_STREAM_2) + .setWriterSchema(createProtoSchema("complicate")) + .build(); // We do a pattern of: // send to stream1, string1 // send to stream1, string2 @@ -323,10 +327,7 @@ public void testAppendButInflightQueueFull() throws Exception { StatusRuntimeException.class, () -> { sendTestMessage( - connectionWorker, - sw1, - createFooProtoRows(new String[] {String.valueOf(5)}), - 5); + connectionWorker, sw1, createFooProtoRows(new String[] {String.valueOf(5)}), 5); }); long timeDiff = System.currentTimeMillis() - startTime; assertEquals(connectionWorker.getLoad().inFlightRequestsCount(), 5); @@ -334,10 +335,7 @@ public void testAppendButInflightQueueFull() throws Exception { } else { futures.add( sendTestMessage( - connectionWorker, - sw1, - createFooProtoRows(new String[] {String.valueOf(i)}), - i)); + connectionWorker, sw1, createFooProtoRows(new String[] {String.valueOf(i)}), i)); assertEquals(connectionWorker.getLoad().inFlightRequestsCount(), i + 1); } } From 752b1c6db2c2c76a5aee534971c7d7f5d6647afc Mon Sep 17 00:00:00 2001 From: Yiru Tang Date: Tue, 7 Feb 2023 22:22:52 +0000 Subject: [PATCH 3/5] . --- .../com/google/cloud/bigquery/storage/v1/ConnectionWorker.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorker.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorker.java index f085e8f064..05390c56aa 100644 --- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorker.java +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorker.java @@ -24,6 +24,7 @@ import com.google.cloud.bigquery.storage.v1.StreamConnection.DoneCallback; import com.google.cloud.bigquery.storage.v1.StreamConnection.RequestCallback; import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; import com.google.common.util.concurrent.Uninterruptibles; import com.google.protobuf.Int64Value; import io.grpc.Status; @@ -268,6 +269,7 @@ public void run(Throwable finalStatus) { /** Schedules the writing of rows at given offset. */ ApiFuture append(StreamWriter streamWriter, ProtoRows rows, long offset) { + Preconditions.checkNotNull(streamWriter); AppendRowsRequest.Builder requestBuilder = AppendRowsRequest.newBuilder(); requestBuilder.setProtoRows( ProtoData.newBuilder() From 817211e01e9e128087bab4b64b8154364b8bc356 Mon Sep 17 00:00:00 2001 From: Yiru Tang Date: Tue, 7 Feb 2023 22:39:33 +0000 Subject: [PATCH 4/5] . --- .../storage/v1/ConnectionWorkerTest.java | 18 ++++++++++++------ 1 file changed, 12 insertions(+), 6 deletions(-) diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerTest.java index 424a27d0a9..4edf0f3e9d 100644 --- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerTest.java +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerTest.java @@ -81,9 +81,11 @@ public void testMultiplexedAppendSuccess() throws Exception { } List> futures = new ArrayList<>(); StreamWriter sw1 = - StreamWriter.newBuilder(TEST_STREAM_1).setWriterSchema(createProtoSchema("foo")).build(); + StreamWriter.newBuilder(TEST_STREAM_1, client) + .setWriterSchema(createProtoSchema("foo")) + .build(); StreamWriter sw2 = - StreamWriter.newBuilder(TEST_STREAM_2) + StreamWriter.newBuilder(TEST_STREAM_2, client) .setWriterSchema(createProtoSchema("complicate")) .build(); // We do a pattern of: @@ -201,9 +203,12 @@ public void testAppendInSameStream_switchSchema() throws Exception { // send to stream1, schema3 // send to stream1, schema1 // ... - StreamWriter sw1 = StreamWriter.newBuilder(TEST_STREAM_1).setWriterSchema(schema1).build(); - StreamWriter sw2 = StreamWriter.newBuilder(TEST_STREAM_1).setWriterSchema(schema2).build(); - StreamWriter sw3 = StreamWriter.newBuilder(TEST_STREAM_1).setWriterSchema(schema3).build(); + StreamWriter sw1 = + StreamWriter.newBuilder(TEST_STREAM_1, client).setWriterSchema(schema1).build(); + StreamWriter sw2 = + StreamWriter.newBuilder(TEST_STREAM_1, client).setWriterSchema(schema2).build(); + StreamWriter sw3 = + StreamWriter.newBuilder(TEST_STREAM_1, client).setWriterSchema(schema3).build(); for (long i = 0; i < appendCount; i++) { switch ((int) i % 4) { case 0: @@ -298,7 +303,8 @@ public void testAppendInSameStream_switchSchema() throws Exception { @Test public void testAppendButInflightQueueFull() throws Exception { ProtoSchema schema1 = createProtoSchema("foo"); - StreamWriter sw1 = StreamWriter.newBuilder(TEST_STREAM_1).setWriterSchema(schema1).build(); + StreamWriter sw1 = + StreamWriter.newBuilder(TEST_STREAM_1, client).setWriterSchema(schema1).build(); ConnectionWorker connectionWorker = new ConnectionWorker( TEST_STREAM_1, From 1e92e29720730524fa33a8ac096328728d62d860 Mon Sep 17 00:00:00 2001 From: Owl Bot Date: Fri, 10 Feb 2023 18:56:16 +0000 Subject: [PATCH 5/5] =?UTF-8?q?=F0=9F=A6=89=20Updates=20from=20OwlBot=20po?= =?UTF-8?q?st-processor?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md --- README.md | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/README.md b/README.md index 908faab2c6..bd1f5bbb21 100644 --- a/README.md +++ b/README.md @@ -49,20 +49,20 @@ If you are using Maven without BOM, add this to your dependencies: If you are using Gradle 5.x or later, add this to your dependencies: ```Groovy -implementation platform('com.google.cloud:libraries-bom:26.6.0') +implementation platform('com.google.cloud:libraries-bom:26.7.0') implementation 'com.google.cloud:google-cloud-bigquerystorage' ``` If you are using Gradle without BOM, add this to your dependencies: ```Groovy -implementation 'com.google.cloud:google-cloud-bigquerystorage:2.29.0' +implementation 'com.google.cloud:google-cloud-bigquerystorage:2.31.0' ``` If you are using SBT, add this to your dependencies: ```Scala -libraryDependencies += "com.google.cloud" % "google-cloud-bigquerystorage" % "2.29.0" +libraryDependencies += "com.google.cloud" % "google-cloud-bigquerystorage" % "2.31.0" ``` ## Authentication