From a0676f28492b3502322e9ed82ecc46cc0e4a0e96 Mon Sep 17 00:00:00 2001 From: yirutang Date: Tue, 8 Feb 2022 08:10:36 -0800 Subject: [PATCH 01/18] . --- .../bigquery/storage/v1/JsonStreamWriter.java | 40 +++++++++++++- .../it/ITBigQueryWriteManualClientTest.java | 53 +++++++++++++++++++ 2 files changed, 91 insertions(+), 2 deletions(-) diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/JsonStreamWriter.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/JsonStreamWriter.java index 31d1d2493f..32a9ca2147 100644 --- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/JsonStreamWriter.java +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/JsonStreamWriter.java @@ -18,6 +18,7 @@ import com.google.api.core.ApiFuture; import com.google.api.gax.batching.FlowControlSettings; import com.google.api.gax.core.CredentialsProvider; +import com.google.api.gax.rpc.InvalidArgumentException; import com.google.api.gax.rpc.TransportChannelProvider; import com.google.common.base.Preconditions; import com.google.protobuf.Descriptors; @@ -53,6 +54,9 @@ public class JsonStreamWriter implements AutoCloseable { private Descriptor descriptor; private TableSchema tableSchema; private boolean ignoreUnknownFields = false; + private boolean reconnectOnStuck = false; + private long totalMessageSize = 0; + private ProtoSchema protoSchema; /** * Constructs the JsonStreamWriter @@ -71,7 +75,9 @@ private JsonStreamWriter(Builder builder) } else { streamWriterBuilder = StreamWriter.newBuilder(builder.streamName, builder.client); } - streamWriterBuilder.setWriterSchema(ProtoSchemaConverter.convert(this.descriptor)); + this.protoSchema = ProtoSchemaConverter.convert(this.descriptor); + this.totalMessageSize = protoSchema.getSerializedSize(); + streamWriterBuilder.setWriterSchema(protoSchema); setStreamWriterSettings( builder.channelProvider, builder.credentialsProvider, @@ -82,6 +88,7 @@ private JsonStreamWriter(Builder builder) this.streamName = builder.streamName; this.tableSchema = builder.tableSchema; this.ignoreUnknownFields = builder.ignoreUnknownFields; + this.reconnectOnStuck = builder.reconnectOnStuck; } /** @@ -122,10 +129,12 @@ public ApiFuture append(JSONArray jsonArr, long offset) this.tableSchema = updatedSchema; this.descriptor = BQTableSchemaToProtoDescriptor.convertBQTableSchemaToProtoDescriptor(updatedSchema); + this.protoSchema = ProtoSchemaConverter.convert(this.descriptor); + this.totalMessageSize = protoSchema.getSerializedSize(); // Create a new underlying StreamWriter with the updated TableSchema and Descriptor this.streamWriter = streamWriterBuilder - .setWriterSchema(ProtoSchemaConverter.convert(this.descriptor)) + .setWriterSchema(this.protoSchema) .build(); } } @@ -134,15 +143,30 @@ public ApiFuture append(JSONArray jsonArr, long offset) // Any error in convertJsonToProtoMessage will throw an // IllegalArgumentException/IllegalStateException/NullPointerException and will halt processing // of JSON data. + long currentRequestSize = 0; for (int i = 0; i < jsonArr.length(); i++) { JSONObject json = jsonArr.getJSONObject(i); Message protoMessage = JsonToProtoMessage.convertJsonToProtoMessage( this.descriptor, this.tableSchema, json, ignoreUnknownFields); rowsBuilder.addSerializedRows(protoMessage.toByteString()); + currentRequestSize += protoMessage.getSerializedSize(); } // Need to make sure refreshAppendAndSetDescriptor finish first before this can run synchronized (this) { + this.totalMessageSize += currentRequestSize; + // Reconnect on every 9.5MB. + if (this.totalMessageSize > 9500000 && this.reconnectOnStuck) { + LOG.info("reconnecting"); + streamWriter.close(); + // Create a new underlying StreamWriter with the updated TableSchema and Descriptor + this.streamWriter = + streamWriterBuilder + .setWriterSchema(protoSchema) + .build(); + this.totalMessageSize = this.protoSchema.getSerializedSize() + currentRequestSize; + // Allow first request to pass. + } final ApiFuture appendResponseFuture = this.streamWriter.append(rowsBuilder.build(), offset); return appendResponseFuture; @@ -264,6 +288,7 @@ public static final class Builder { private boolean createDefaultStream = false; private String traceId; private boolean ignoreUnknownFields = false; + private boolean reconnectOnStuck = false; private static String streamPatternString = "(projects/[^/]+/datasets/[^/]+/tables/[^/]+)/streams/[^/]+"; @@ -377,6 +402,17 @@ public Builder setIgnoreUnknownFields(boolean ignoreUnknownFields) { return this; } + /** + * Setter for a reconnectOnStuck, temporaily workaround for omg/48020 + * + * @param reconnectOnStuck + * @return Builder + */ + public Builder setReconnectOnStuck(boolean reconnectOnStuck) { + this.reconnectOnStuck = reconnectOnStuck; + return this; + } + /** * Builds JsonStreamWriter * diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/ITBigQueryWriteManualClientTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/ITBigQueryWriteManualClientTest.java index b20da6cbb4..48d6d8dde9 100644 --- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/ITBigQueryWriteManualClientTest.java +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/ITBigQueryWriteManualClientTest.java @@ -412,6 +412,59 @@ public void testJsonStreamWriterWithDefaultStream() } } + @Test + public void testJsonStreamWriterWithDefaultStreamLarge() + throws IOException, InterruptedException, ExecutionException, + Descriptors.DescriptorValidationException { + String tableName = "TableLarge"; + TableId tableId = TableId.of(DATASET, tableName); + Field col1 = Field.newBuilder("col1", StandardSQLTypeName.STRING).build(); + Schema schema = Schema.of(col1); + TableInfo tableInfo = TableInfo.newBuilder(tableId, StandardTableDefinition.of(schema)).build(); + bigquery.create(tableInfo); + TableName parent = TableName.of(ServiceOptions.getDefaultProjectId(), DATASET, tableName); + + WriteStream writeStream = + client.createWriteStream( + CreateWriteStreamRequest.newBuilder() + .setParent(parent.toString()) + .setWriteStream( + WriteStream.newBuilder().setType(WriteStream.Type.COMMITTED).build()) + .build()); + int totalRequest = 100; + int rowBatch = 20000; + ArrayList> allResponses = + new ArrayList>(totalRequest); + // Sends a total of 150MB over the wire. + try (JsonStreamWriter jsonStreamWriter = + JsonStreamWriter.newBuilder(writeStream.getName(), writeStream.getTableSchema()) + .setReconnectOnStuck(true) + .build()) { + for (int k = 0; k < totalRequest; k++) { + JSONObject row = new JSONObject(); + row.put("col1", "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"); + JSONArray jsonArr = new JSONArray(); + // 1.5MB batch. + for (int j = 0; j < rowBatch; j++) { + jsonArr.put(row); + } + LOG.info("Appending: " + k +"/" + rowBatch); + allResponses.add(jsonStreamWriter.append(jsonArr, k * rowBatch)); + } + } + LOG.info("Waiting for all responses to come back"); + for (int i = 0; i < totalRequest; i++) { + try { + Assert.assertEquals( + allResponses.get(i).get().getAppendResult().getOffset().getValue(), i * rowBatch); + } catch (ExecutionException ex) { + if (ex.toString().contains("The offset is within stream,")) { + Assert.fail("Unexpected error " + ex); + } + } + } + } + @Test public void testJsonStreamWriterSchemaUpdate() throws DescriptorValidationException, IOException, InterruptedException, ExecutionException { From 81191fc74bd357ed82f952fa8504ff6811b44db3 Mon Sep 17 00:00:00 2001 From: yirutang Date: Tue, 8 Feb 2022 08:15:37 -0800 Subject: [PATCH 02/18] . --- .../cloud/bigquery/storage/v1/JsonStreamWriter.java | 12 ++---------- .../v1/it/ITBigQueryWriteManualClientTest.java | 10 +++++----- 2 files changed, 7 insertions(+), 15 deletions(-) diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/JsonStreamWriter.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/JsonStreamWriter.java index 32a9ca2147..094d5f2223 100644 --- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/JsonStreamWriter.java +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/JsonStreamWriter.java @@ -18,7 +18,6 @@ import com.google.api.core.ApiFuture; import com.google.api.gax.batching.FlowControlSettings; import com.google.api.gax.core.CredentialsProvider; -import com.google.api.gax.rpc.InvalidArgumentException; import com.google.api.gax.rpc.TransportChannelProvider; import com.google.common.base.Preconditions; import com.google.protobuf.Descriptors; @@ -132,10 +131,7 @@ public ApiFuture append(JSONArray jsonArr, long offset) this.protoSchema = ProtoSchemaConverter.convert(this.descriptor); this.totalMessageSize = protoSchema.getSerializedSize(); // Create a new underlying StreamWriter with the updated TableSchema and Descriptor - this.streamWriter = - streamWriterBuilder - .setWriterSchema(this.protoSchema) - .build(); + this.streamWriter = streamWriterBuilder.setWriterSchema(this.protoSchema).build(); } } @@ -157,13 +153,9 @@ public ApiFuture append(JSONArray jsonArr, long offset) this.totalMessageSize += currentRequestSize; // Reconnect on every 9.5MB. if (this.totalMessageSize > 9500000 && this.reconnectOnStuck) { - LOG.info("reconnecting"); streamWriter.close(); // Create a new underlying StreamWriter with the updated TableSchema and Descriptor - this.streamWriter = - streamWriterBuilder - .setWriterSchema(protoSchema) - .build(); + this.streamWriter = streamWriterBuilder.setWriterSchema(protoSchema).build(); this.totalMessageSize = this.protoSchema.getSerializedSize() + currentRequestSize; // Allow first request to pass. } diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/ITBigQueryWriteManualClientTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/ITBigQueryWriteManualClientTest.java index 48d6d8dde9..5ff459fae0 100644 --- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/ITBigQueryWriteManualClientTest.java +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/ITBigQueryWriteManualClientTest.java @@ -415,7 +415,7 @@ public void testJsonStreamWriterWithDefaultStream() @Test public void testJsonStreamWriterWithDefaultStreamLarge() throws IOException, InterruptedException, ExecutionException, - Descriptors.DescriptorValidationException { + Descriptors.DescriptorValidationException { String tableName = "TableLarge"; TableId tableId = TableId.of(DATASET, tableName); Field col1 = Field.newBuilder("col1", StandardSQLTypeName.STRING).build(); @@ -437,9 +437,9 @@ public void testJsonStreamWriterWithDefaultStreamLarge() new ArrayList>(totalRequest); // Sends a total of 150MB over the wire. try (JsonStreamWriter jsonStreamWriter = - JsonStreamWriter.newBuilder(writeStream.getName(), writeStream.getTableSchema()) - .setReconnectOnStuck(true) - .build()) { + JsonStreamWriter.newBuilder(writeStream.getName(), writeStream.getTableSchema()) + .setReconnectOnStuck(true) + .build()) { for (int k = 0; k < totalRequest; k++) { JSONObject row = new JSONObject(); row.put("col1", "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"); @@ -448,7 +448,7 @@ public void testJsonStreamWriterWithDefaultStreamLarge() for (int j = 0; j < rowBatch; j++) { jsonArr.put(row); } - LOG.info("Appending: " + k +"/" + rowBatch); + LOG.info("Appending: " + k + "/" + rowBatch); allResponses.add(jsonStreamWriter.append(jsonArr, k * rowBatch)); } } From 4a62570caf97a6d6e99f05cbf5ef1257fe951abd Mon Sep 17 00:00:00 2001 From: yirutang Date: Tue, 8 Feb 2022 10:47:10 -0800 Subject: [PATCH 03/18] . --- .../cloud/bigquery/storage/v1/JsonStreamWriter.java | 10 ++++++++++ .../storage/v1/it/ITBigQueryWriteManualClientTest.java | 2 +- 2 files changed, 11 insertions(+), 1 deletion(-) diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/JsonStreamWriter.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/JsonStreamWriter.java index 094d5f2223..1f0e17f9c4 100644 --- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/JsonStreamWriter.java +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/JsonStreamWriter.java @@ -55,6 +55,7 @@ public class JsonStreamWriter implements AutoCloseable { private boolean ignoreUnknownFields = false; private boolean reconnectOnStuck = false; private long totalMessageSize = 0; + private long absTotal = 0; private ProtoSchema protoSchema; /** @@ -151,14 +152,23 @@ public ApiFuture append(JSONArray jsonArr, long offset) // Need to make sure refreshAppendAndSetDescriptor finish first before this can run synchronized (this) { this.totalMessageSize += currentRequestSize; + this.absTotal += currentRequestSize; // Reconnect on every 9.5MB. if (this.totalMessageSize > 9500000 && this.reconnectOnStuck) { streamWriter.close(); // Create a new underlying StreamWriter with the updated TableSchema and Descriptor this.streamWriter = streamWriterBuilder.setWriterSchema(protoSchema).build(); this.totalMessageSize = this.protoSchema.getSerializedSize() + currentRequestSize; + this.absTotal += currentRequestSize; // Allow first request to pass. } + LOG.info( + "Sending a total of:" + + this.totalMessageSize + + " " + + currentRequestSize + + " " + + this.absTotal); final ApiFuture appendResponseFuture = this.streamWriter.append(rowsBuilder.build(), offset); return appendResponseFuture; diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/ITBigQueryWriteManualClientTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/ITBigQueryWriteManualClientTest.java index 5ff459fae0..5b966f7a67 100644 --- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/ITBigQueryWriteManualClientTest.java +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/ITBigQueryWriteManualClientTest.java @@ -448,7 +448,7 @@ public void testJsonStreamWriterWithDefaultStreamLarge() for (int j = 0; j < rowBatch; j++) { jsonArr.put(row); } - LOG.info("Appending: " + k + "/" + rowBatch); + LOG.fine("Appending: " + k + "/" + totalRequest); allResponses.add(jsonStreamWriter.append(jsonArr, k * rowBatch)); } } From 5e8c370161cb7738cdf35b00f23c5752470b89e0 Mon Sep 17 00:00:00 2001 From: yirutang Date: Tue, 8 Feb 2022 11:30:37 -0800 Subject: [PATCH 04/18] . --- .../com/google/cloud/bigquery/storage/v1/JsonStreamWriter.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/JsonStreamWriter.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/JsonStreamWriter.java index 1f0e17f9c4..473ad41b6a 100644 --- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/JsonStreamWriter.java +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/JsonStreamWriter.java @@ -162,7 +162,7 @@ public ApiFuture append(JSONArray jsonArr, long offset) this.absTotal += currentRequestSize; // Allow first request to pass. } - LOG.info( + LOG.fine( "Sending a total of:" + this.totalMessageSize + " " From 7693b563dd56f07d29d372b381d116d98d0cca31 Mon Sep 17 00:00:00 2001 From: yirutang Date: Tue, 8 Feb 2022 11:37:21 -0800 Subject: [PATCH 05/18] . --- .../google/cloud/bigquery/storage/v1/JsonStreamWriter.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/JsonStreamWriter.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/JsonStreamWriter.java index 473ad41b6a..644540ae8f 100644 --- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/JsonStreamWriter.java +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/JsonStreamWriter.java @@ -405,7 +405,9 @@ public Builder setIgnoreUnknownFields(boolean ignoreUnknownFields) { } /** - * Setter for a reconnectOnStuck, temporaily workaround for omg/48020 + * Setter for a reconnectOnStuck, temporaily workaround for omg/48020. Fix for the omg is supposed to roll out + * by 2/11/2022 Friday. If you set this to True, your write will be slower (0.75MB/s per connection), but your + * writes will not be stuck as a sympton of omg/48020. * * @param reconnectOnStuck * @return Builder From dbd2f5be27dcd969158ed222c1cf21799d99507f Mon Sep 17 00:00:00 2001 From: yirutang Date: Tue, 8 Feb 2022 11:44:08 -0800 Subject: [PATCH 06/18] . --- .../bigquery/storage/v1/it/ITBigQueryWriteManualClientTest.java | 1 + 1 file changed, 1 insertion(+) diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/ITBigQueryWriteManualClientTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/ITBigQueryWriteManualClientTest.java index 5b966f7a67..e2a966fbf9 100644 --- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/ITBigQueryWriteManualClientTest.java +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/ITBigQueryWriteManualClientTest.java @@ -412,6 +412,7 @@ public void testJsonStreamWriterWithDefaultStream() } } + // This test runs about 4-5 minutes, sometimes 10 minutes. @Test public void testJsonStreamWriterWithDefaultStreamLarge() throws IOException, InterruptedException, ExecutionException, From ea508f0850b1ae06c68c0ebacdd62f4943f42812 Mon Sep 17 00:00:00 2001 From: yirutang Date: Tue, 8 Feb 2022 11:49:22 -0800 Subject: [PATCH 07/18] . --- .../google/cloud/bigquery/storage/v1/JsonStreamWriter.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/JsonStreamWriter.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/JsonStreamWriter.java index 644540ae8f..4e864f567d 100644 --- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/JsonStreamWriter.java +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/JsonStreamWriter.java @@ -405,9 +405,9 @@ public Builder setIgnoreUnknownFields(boolean ignoreUnknownFields) { } /** - * Setter for a reconnectOnStuck, temporaily workaround for omg/48020. Fix for the omg is supposed to roll out - * by 2/11/2022 Friday. If you set this to True, your write will be slower (0.75MB/s per connection), but your - * writes will not be stuck as a sympton of omg/48020. + * Setter for a reconnectOnStuck, temporaily workaround for omg/48020. Fix for the omg is + * supposed to roll out by 2/11/2022 Friday. If you set this to True, your write will be slower + * (0.75MB/s per connection), but your writes will not be stuck as a sympton of omg/48020. * * @param reconnectOnStuck * @return Builder From 759552bcabec21f02649172a77e7bd3494885473 Mon Sep 17 00:00:00 2001 From: Owl Bot Date: Tue, 8 Feb 2022 19:50:48 +0000 Subject: [PATCH 08/18] =?UTF-8?q?=F0=9F=A6=89=20Updates=20from=20OwlBot?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md --- README.md | 2 +- .../google/cloud/bigquery/storage/v1/JsonStreamWriter.java | 6 +++--- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/README.md b/README.md index 7c1c7f1ac0..1fab747ef2 100644 --- a/README.md +++ b/README.md @@ -49,7 +49,7 @@ If you are using Maven without BOM, add this to your dependencies: If you are using Gradle 5.x or later, add this to your dependencies ```Groovy -implementation platform('com.google.cloud:libraries-bom:24.2.0') +implementation platform('com.google.cloud:libraries-bom:24.3.0') implementation 'com.google.cloud:google-cloud-bigquerystorage' ``` diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/JsonStreamWriter.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/JsonStreamWriter.java index 644540ae8f..4e864f567d 100644 --- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/JsonStreamWriter.java +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/JsonStreamWriter.java @@ -405,9 +405,9 @@ public Builder setIgnoreUnknownFields(boolean ignoreUnknownFields) { } /** - * Setter for a reconnectOnStuck, temporaily workaround for omg/48020. Fix for the omg is supposed to roll out - * by 2/11/2022 Friday. If you set this to True, your write will be slower (0.75MB/s per connection), but your - * writes will not be stuck as a sympton of omg/48020. + * Setter for a reconnectOnStuck, temporaily workaround for omg/48020. Fix for the omg is + * supposed to roll out by 2/11/2022 Friday. If you set this to True, your write will be slower + * (0.75MB/s per connection), but your writes will not be stuck as a sympton of omg/48020. * * @param reconnectOnStuck * @return Builder From f4f37eb54abd5e41bc4176b17596481bbbe5a1a9 Mon Sep 17 00:00:00 2001 From: yirutang Date: Tue, 8 Feb 2022 12:04:38 -0800 Subject: [PATCH 09/18] . --- .../bigquery/storage/v1/it/ITBigQueryWriteManualClientTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/ITBigQueryWriteManualClientTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/ITBigQueryWriteManualClientTest.java index e2a966fbf9..b2364cbbb0 100644 --- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/ITBigQueryWriteManualClientTest.java +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/ITBigQueryWriteManualClientTest.java @@ -449,7 +449,7 @@ public void testJsonStreamWriterWithDefaultStreamLarge() for (int j = 0; j < rowBatch; j++) { jsonArr.put(row); } - LOG.fine("Appending: " + k + "/" + totalRequest); + LOG.info("Appending: " + k + "/" + totalRequest); allResponses.add(jsonStreamWriter.append(jsonArr, k * rowBatch)); } } From e578493740a1af29f5c739507dd7195ebd7af19d Mon Sep 17 00:00:00 2001 From: yirutang Date: Tue, 8 Feb 2022 12:45:36 -0800 Subject: [PATCH 10/18] . --- .../storage/v1/it/ITBigQueryWriteManualClientTest.java | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/ITBigQueryWriteManualClientTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/ITBigQueryWriteManualClientTest.java index b2364cbbb0..ef009b4b71 100644 --- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/ITBigQueryWriteManualClientTest.java +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/ITBigQueryWriteManualClientTest.java @@ -412,7 +412,7 @@ public void testJsonStreamWriterWithDefaultStream() } } - // This test runs about 4-5 minutes, sometimes 10 minutes. + // This test runs about 1 min. @Test public void testJsonStreamWriterWithDefaultStreamLarge() throws IOException, InterruptedException, ExecutionException, @@ -432,8 +432,8 @@ public void testJsonStreamWriterWithDefaultStreamLarge() .setWriteStream( WriteStream.newBuilder().setType(WriteStream.Type.COMMITTED).build()) .build()); - int totalRequest = 100; - int rowBatch = 20000; + int totalRequest = 10; + int rowBatch = 40000; ArrayList> allResponses = new ArrayList>(totalRequest); // Sends a total of 150MB over the wire. @@ -445,7 +445,7 @@ public void testJsonStreamWriterWithDefaultStreamLarge() JSONObject row = new JSONObject(); row.put("col1", "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"); JSONArray jsonArr = new JSONArray(); - // 1.5MB batch. + // 3MB batch. for (int j = 0; j < rowBatch; j++) { jsonArr.put(row); } From e0093cae53fba2a6b31bb367d912fa2cb8aad9ca Mon Sep 17 00:00:00 2001 From: yirutang Date: Tue, 8 Feb 2022 12:51:17 -0800 Subject: [PATCH 11/18] . --- .../bigquery/storage/v1/it/ITBigQueryWriteManualClientTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/ITBigQueryWriteManualClientTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/ITBigQueryWriteManualClientTest.java index ef009b4b71..c87fe08cc8 100644 --- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/ITBigQueryWriteManualClientTest.java +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/ITBigQueryWriteManualClientTest.java @@ -436,7 +436,7 @@ public void testJsonStreamWriterWithDefaultStreamLarge() int rowBatch = 40000; ArrayList> allResponses = new ArrayList>(totalRequest); - // Sends a total of 150MB over the wire. + // Sends a total of 30MB over the wire. try (JsonStreamWriter jsonStreamWriter = JsonStreamWriter.newBuilder(writeStream.getName(), writeStream.getTableSchema()) .setReconnectOnStuck(true) From ecc93f607f430ddc5e1231d527fd8c1253d17d89 Mon Sep 17 00:00:00 2001 From: Yiru Tang Date: Tue, 8 Feb 2022 13:38:23 -0800 Subject: [PATCH 12/18] Update ITBigQueryWriteManualClientTest.java --- .../bigquery/storage/v1/it/ITBigQueryWriteManualClientTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/ITBigQueryWriteManualClientTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/ITBigQueryWriteManualClientTest.java index c87fe08cc8..9663a7cac0 100644 --- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/ITBigQueryWriteManualClientTest.java +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/ITBigQueryWriteManualClientTest.java @@ -459,7 +459,7 @@ public void testJsonStreamWriterWithDefaultStreamLarge() Assert.assertEquals( allResponses.get(i).get().getAppendResult().getOffset().getValue(), i * rowBatch); } catch (ExecutionException ex) { - if (ex.toString().contains("The offset is within stream,")) { + if (!ex.toString().contains("The offset is within stream,")) { Assert.fail("Unexpected error " + ex); } } From 133d983b7b783d22ecdb60cd6f396c2e178962db Mon Sep 17 00:00:00 2001 From: Yiru Tang Date: Tue, 8 Feb 2022 13:39:10 -0800 Subject: [PATCH 13/18] Update ITBigQueryWriteManualClientTest.java all errors should fail. --- .../storage/v1/it/ITBigQueryWriteManualClientTest.java | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/ITBigQueryWriteManualClientTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/ITBigQueryWriteManualClientTest.java index 9663a7cac0..e2239562ae 100644 --- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/ITBigQueryWriteManualClientTest.java +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/ITBigQueryWriteManualClientTest.java @@ -459,9 +459,7 @@ public void testJsonStreamWriterWithDefaultStreamLarge() Assert.assertEquals( allResponses.get(i).get().getAppendResult().getOffset().getValue(), i * rowBatch); } catch (ExecutionException ex) { - if (!ex.toString().contains("The offset is within stream,")) { - Assert.fail("Unexpected error " + ex); - } + Assert.fail("Unexpected error " + ex); } } } From 9f34e9065ff5436993a7968dc4078b6efdaade1b Mon Sep 17 00:00:00 2001 From: yirutang Date: Tue, 8 Feb 2022 13:43:37 -0800 Subject: [PATCH 14/18] . --- .../bigquery/storage/v1/JsonStreamWriter.java | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/JsonStreamWriter.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/JsonStreamWriter.java index 4e864f567d..e2bf57790c 100644 --- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/JsonStreamWriter.java +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/JsonStreamWriter.java @@ -53,7 +53,7 @@ public class JsonStreamWriter implements AutoCloseable { private Descriptor descriptor; private TableSchema tableSchema; private boolean ignoreUnknownFields = false; - private boolean reconnectOnStuck = false; + private boolean reconnectAfter10M = false; private long totalMessageSize = 0; private long absTotal = 0; private ProtoSchema protoSchema; @@ -88,7 +88,7 @@ private JsonStreamWriter(Builder builder) this.streamName = builder.streamName; this.tableSchema = builder.tableSchema; this.ignoreUnknownFields = builder.ignoreUnknownFields; - this.reconnectOnStuck = builder.reconnectOnStuck; + this.reconnectAfter10M = builder.reconnectAfter10M; } /** @@ -154,7 +154,7 @@ public ApiFuture append(JSONArray jsonArr, long offset) this.totalMessageSize += currentRequestSize; this.absTotal += currentRequestSize; // Reconnect on every 9.5MB. - if (this.totalMessageSize > 9500000 && this.reconnectOnStuck) { + if (this.totalMessageSize > 9500000 && this.reconnectAfter10M) { streamWriter.close(); // Create a new underlying StreamWriter with the updated TableSchema and Descriptor this.streamWriter = streamWriterBuilder.setWriterSchema(protoSchema).build(); @@ -290,7 +290,7 @@ public static final class Builder { private boolean createDefaultStream = false; private String traceId; private boolean ignoreUnknownFields = false; - private boolean reconnectOnStuck = false; + private boolean reconnectAfter10M = false; private static String streamPatternString = "(projects/[^/]+/datasets/[^/]+/tables/[^/]+)/streams/[^/]+"; @@ -405,15 +405,15 @@ public Builder setIgnoreUnknownFields(boolean ignoreUnknownFields) { } /** - * Setter for a reconnectOnStuck, temporaily workaround for omg/48020. Fix for the omg is + * Setter for a reconnectAfter10M, temporaily workaround for omg/48020. Fix for the omg is * supposed to roll out by 2/11/2022 Friday. If you set this to True, your write will be slower * (0.75MB/s per connection), but your writes will not be stuck as a sympton of omg/48020. * - * @param reconnectOnStuck + * @param reconnectAfter10M * @return Builder */ - public Builder setReconnectOnStuck(boolean reconnectOnStuck) { - this.reconnectOnStuck = reconnectOnStuck; + public Builder setReconnectAfter10M(boolean reconnectAfter10M) { + this.reconnectAfter10M = reconnectAfter10M; return this; } From 48402b5b6453dd213d31304201e82bbc037e198b Mon Sep 17 00:00:00 2001 From: yirutang Date: Tue, 8 Feb 2022 13:44:25 -0800 Subject: [PATCH 15/18] . --- .../bigquery/storage/v1/it/ITBigQueryWriteManualClientTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/ITBigQueryWriteManualClientTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/ITBigQueryWriteManualClientTest.java index e2239562ae..116259a54e 100644 --- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/ITBigQueryWriteManualClientTest.java +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/ITBigQueryWriteManualClientTest.java @@ -439,7 +439,7 @@ public void testJsonStreamWriterWithDefaultStreamLarge() // Sends a total of 30MB over the wire. try (JsonStreamWriter jsonStreamWriter = JsonStreamWriter.newBuilder(writeStream.getName(), writeStream.getTableSchema()) - .setReconnectOnStuck(true) + .setReconnectAfter10M(true) .build()) { for (int k = 0; k < totalRequest; k++) { JSONObject row = new JSONObject(); From 0906d6e576579816d10da18aeefc22c67c3f20c5 Mon Sep 17 00:00:00 2001 From: Yiru Tang Date: Tue, 8 Feb 2022 13:48:55 -0800 Subject: [PATCH 16/18] update comment. --- .../com/google/cloud/bigquery/storage/v1/JsonStreamWriter.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/JsonStreamWriter.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/JsonStreamWriter.java index e2bf57790c..b322ea3bca 100644 --- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/JsonStreamWriter.java +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/JsonStreamWriter.java @@ -156,7 +156,7 @@ public ApiFuture append(JSONArray jsonArr, long offset) // Reconnect on every 9.5MB. if (this.totalMessageSize > 9500000 && this.reconnectAfter10M) { streamWriter.close(); - // Create a new underlying StreamWriter with the updated TableSchema and Descriptor + // Create a new underlying StreamWriter aka establish a new connection. this.streamWriter = streamWriterBuilder.setWriterSchema(protoSchema).build(); this.totalMessageSize = this.protoSchema.getSerializedSize() + currentRequestSize; this.absTotal += currentRequestSize; From a152337bd3bec3f994285e2d30d9b0ded88920d8 Mon Sep 17 00:00:00 2001 From: yirutang Date: Tue, 8 Feb 2022 14:53:16 -0800 Subject: [PATCH 17/18] . --- .../storage/v1/it/ITBigQueryWriteManualClientTest.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/ITBigQueryWriteManualClientTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/ITBigQueryWriteManualClientTest.java index 116259a54e..86b3af8352 100644 --- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/ITBigQueryWriteManualClientTest.java +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/ITBigQueryWriteManualClientTest.java @@ -414,7 +414,7 @@ public void testJsonStreamWriterWithDefaultStream() // This test runs about 1 min. @Test - public void testJsonStreamWriterWithDefaultStreamLarge() + public void testJsonStreamWriterWithMessagesOver10M() throws IOException, InterruptedException, ExecutionException, Descriptors.DescriptorValidationException { String tableName = "TableLarge"; @@ -439,7 +439,7 @@ public void testJsonStreamWriterWithDefaultStreamLarge() // Sends a total of 30MB over the wire. try (JsonStreamWriter jsonStreamWriter = JsonStreamWriter.newBuilder(writeStream.getName(), writeStream.getTableSchema()) - .setReconnectAfter10M(true) + .setReconnectAfter10M(false) .build()) { for (int k = 0; k < totalRequest; k++) { JSONObject row = new JSONObject(); From b169620458e57b558b67a97ed79acecab72faf50 Mon Sep 17 00:00:00 2001 From: yirutang Date: Tue, 8 Feb 2022 14:58:29 -0800 Subject: [PATCH 18/18] . --- .../bigquery/storage/v1/it/ITBigQueryWriteManualClientTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/ITBigQueryWriteManualClientTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/ITBigQueryWriteManualClientTest.java index 86b3af8352..796e1acb29 100644 --- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/ITBigQueryWriteManualClientTest.java +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/ITBigQueryWriteManualClientTest.java @@ -439,7 +439,7 @@ public void testJsonStreamWriterWithMessagesOver10M() // Sends a total of 30MB over the wire. try (JsonStreamWriter jsonStreamWriter = JsonStreamWriter.newBuilder(writeStream.getName(), writeStream.getTableSchema()) - .setReconnectAfter10M(false) + .setReconnectAfter10M(true) .build()) { for (int k = 0; k < totalRequest; k++) { JSONObject row = new JSONObject();