From 88df0612d05960ec02378d92f0774bd9212e4257 Mon Sep 17 00:00:00 2001 From: yirutang Date: Thu, 5 May 2022 17:04:18 -0700 Subject: [PATCH 01/11] fix[1539] --- .../google/cloud/bigquery/storage/v1/StreamWriterTest.java | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/StreamWriterTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/StreamWriterTest.java index 4962a41a6a..e6cffc2012 100644 --- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/StreamWriterTest.java +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/StreamWriterTest.java @@ -15,10 +15,7 @@ */ package com.google.cloud.bigquery.storage.v1; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertThrows; -import static org.junit.Assert.assertTrue; +import static org.junit.Assert.*; import com.google.api.core.ApiFuture; import com.google.api.gax.batching.FlowController; From 922d7448c0972b653ff2096fa12974a8659e50ea Mon Sep 17 00:00:00 2001 From: yirutang Date: Thu, 5 May 2022 17:13:41 -0700 Subject: [PATCH 02/11] . --- .../google/cloud/bigquery/storage/v1/StreamWriterTest.java | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/StreamWriterTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/StreamWriterTest.java index e6cffc2012..628c44afd6 100644 --- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/StreamWriterTest.java +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/StreamWriterTest.java @@ -15,7 +15,11 @@ */ package com.google.cloud.bigquery.storage.v1; -import static org.junit.Assert.*; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertThrows; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; import com.google.api.core.ApiFuture; import com.google.api.gax.batching.FlowController; From da756855d01af668e920758a41da63f5ca7a83fd Mon Sep 17 00:00:00 2001 From: yirutang Date: Fri, 6 May 2022 00:23:49 -0700 Subject: [PATCH 03/11] . --- .../com/google/cloud/bigquery/storage/v1/StreamWriterTest.java | 1 - 1 file changed, 1 deletion(-) diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/StreamWriterTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/StreamWriterTest.java index 628c44afd6..4962a41a6a 100644 --- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/StreamWriterTest.java +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/StreamWriterTest.java @@ -19,7 +19,6 @@ import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertThrows; import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; import com.google.api.core.ApiFuture; import com.google.api.gax.batching.FlowController; From 080e385eb77a9814facb45265b9847bfd48401ab Mon Sep 17 00:00:00 2001 From: yirutang Date: Fri, 6 May 2022 00:17:49 -0700 Subject: [PATCH 04/11] fix: Add an extra jsonWriterTest for Limit Behavior --- .../bigquery/storage/v1/JsonStreamWriter.java | 4 ---- .../storage/v1/JsonStreamWriterTest.java | 22 +++++++++++++++++++ 2 files changed, 22 insertions(+), 4 deletions(-) diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/JsonStreamWriter.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/JsonStreamWriter.java index c4463715f4..3186c282f4 100644 --- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/JsonStreamWriter.java +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/JsonStreamWriter.java @@ -78,10 +78,6 @@ private JsonStreamWriter(Builder builder) this.protoSchema = ProtoSchemaConverter.convert(this.descriptor); this.totalMessageSize = protoSchema.getSerializedSize(); streamWriterBuilder.setWriterSchema(protoSchema); - if (builder.flowControlSettings != null) { - streamWriterBuilder.setLimitExceededBehavior( - builder.flowControlSettings.getLimitExceededBehavior()); - } setStreamWriterSettings( builder.channelProvider, builder.credentialsProvider, diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/JsonStreamWriterTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/JsonStreamWriterTest.java index 2a79b01d90..50e66ea6ec 100644 --- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/JsonStreamWriterTest.java +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/JsonStreamWriterTest.java @@ -560,6 +560,7 @@ public void testFlowControlSetting() throws Exception { .setMaxOutstandingRequestBytes(1L) .build()) .build()) { + testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().build()); JSONObject foo = new JSONObject(); foo.put("test_int", 10); JSONArray jsonArr = new JSONArray(); @@ -581,4 +582,25 @@ public void run() throws Throwable { "Exceeds client side inflight buffer, consider add more buffer or open more connections")); } } + + @Test + public void testFlowControlSettingNoLimitBehavior() throws Exception { + TableSchema tableSchema = TableSchema.newBuilder().addFields(0, TEST_INT).build(); + try (JsonStreamWriter writer = + JsonStreamWriter.newBuilder(TEST_STREAM, tableSchema) + .setChannelProvider(channelProvider) + .setCredentialsProvider(NoCredentialsProvider.create()) + .setFlowControlSettings( + FlowControlSettings.newBuilder() + .setMaxOutstandingRequestBytes(1L) + .build()) + .build()) { + JSONObject foo = new JSONObject(); + foo.put("test_int", 10); + JSONArray jsonArr = new JSONArray(); + jsonArr.put(foo); + ApiFuture appendFuture = writer.append(jsonArr); + appendFuture.get(); + } + } } From 32f48ee3b5c82a2080a103bf8532a575a89fca33 Mon Sep 17 00:00:00 2001 From: yirutang Date: Fri, 6 May 2022 00:27:31 -0700 Subject: [PATCH 05/11] . --- .../google/cloud/bigquery/storage/v1/JsonStreamWriterTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/JsonStreamWriterTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/JsonStreamWriterTest.java index 50e66ea6ec..3fb3ac5864 100644 --- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/JsonStreamWriterTest.java +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/JsonStreamWriterTest.java @@ -560,7 +560,6 @@ public void testFlowControlSetting() throws Exception { .setMaxOutstandingRequestBytes(1L) .build()) .build()) { - testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().build()); JSONObject foo = new JSONObject(); foo.put("test_int", 10); JSONArray jsonArr = new JSONArray(); @@ -595,6 +594,7 @@ public void testFlowControlSettingNoLimitBehavior() throws Exception { .setMaxOutstandingRequestBytes(1L) .build()) .build()) { + testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().build()); JSONObject foo = new JSONObject(); foo.put("test_int", 10); JSONArray jsonArr = new JSONArray(); From 87c899182cecb7bad88355ec6703f434af420681 Mon Sep 17 00:00:00 2001 From: yirutang Date: Fri, 6 May 2022 00:29:02 -0700 Subject: [PATCH 06/11] . --- .../google/cloud/bigquery/storage/v1/JsonStreamWriterTest.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/JsonStreamWriterTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/JsonStreamWriterTest.java index 3fb3ac5864..0685511bb4 100644 --- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/JsonStreamWriterTest.java +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/JsonStreamWriterTest.java @@ -582,6 +582,8 @@ public void run() throws Throwable { } } + // This is to test the new addition didn't break previous settings, i.e., sets the inflight limit without limit + // beahviro. @Test public void testFlowControlSettingNoLimitBehavior() throws Exception { TableSchema tableSchema = TableSchema.newBuilder().addFields(0, TEST_INT).build(); From 4a6a1d1d5cdcc1c2456119625726607d93c927a2 Mon Sep 17 00:00:00 2001 From: yirutang Date: Fri, 6 May 2022 00:36:11 -0700 Subject: [PATCH 07/11] . --- .../storage/v1/JsonStreamWriterTest.java | 19 +++++++++---------- 1 file changed, 9 insertions(+), 10 deletions(-) diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/JsonStreamWriterTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/JsonStreamWriterTest.java index 0685511bb4..c69120594b 100644 --- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/JsonStreamWriterTest.java +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/JsonStreamWriterTest.java @@ -582,20 +582,19 @@ public void run() throws Throwable { } } - // This is to test the new addition didn't break previous settings, i.e., sets the inflight limit without limit - // beahviro. + // This is to test the new addition didn't break previous settings, i.e., sets the inflight limit + // without + // limit beahvior. @Test public void testFlowControlSettingNoLimitBehavior() throws Exception { TableSchema tableSchema = TableSchema.newBuilder().addFields(0, TEST_INT).build(); try (JsonStreamWriter writer = - JsonStreamWriter.newBuilder(TEST_STREAM, tableSchema) - .setChannelProvider(channelProvider) - .setCredentialsProvider(NoCredentialsProvider.create()) - .setFlowControlSettings( - FlowControlSettings.newBuilder() - .setMaxOutstandingRequestBytes(1L) - .build()) - .build()) { + JsonStreamWriter.newBuilder(TEST_STREAM, tableSchema) + .setChannelProvider(channelProvider) + .setCredentialsProvider(NoCredentialsProvider.create()) + .setFlowControlSettings( + FlowControlSettings.newBuilder().setMaxOutstandingRequestBytes(1L).build()) + .build()) { testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().build()); JSONObject foo = new JSONObject(); foo.put("test_int", 10); From cf744ff2ce08cd65108767d9e7349c1e6fa18c6e Mon Sep 17 00:00:00 2001 From: yirutang Date: Fri, 6 May 2022 00:36:48 -0700 Subject: [PATCH 08/11] . --- .../google/cloud/bigquery/storage/v1/JsonStreamWriterTest.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/JsonStreamWriterTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/JsonStreamWriterTest.java index c69120594b..4a19f17e24 100644 --- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/JsonStreamWriterTest.java +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/JsonStreamWriterTest.java @@ -583,8 +583,7 @@ public void run() throws Throwable { } // This is to test the new addition didn't break previous settings, i.e., sets the inflight limit - // without - // limit beahvior. + // without limit beahvior. @Test public void testFlowControlSettingNoLimitBehavior() throws Exception { TableSchema tableSchema = TableSchema.newBuilder().addFields(0, TEST_INT).build(); From 52c69bbf83d22143114252bd78c8a223112d18d1 Mon Sep 17 00:00:00 2001 From: yirutang Date: Fri, 6 May 2022 14:44:01 -0700 Subject: [PATCH 09/11] fix an issue that we should reject request before it is added to the queue. --- .../bigquery/storage/v1/StreamWriter.java | 50 ++++++++++--------- .../bigquery/storage/v1/StreamWriterTest.java | 29 ++++++++--- 2 files changed, 50 insertions(+), 29 deletions(-) diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/StreamWriter.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/StreamWriter.java index ef4bc8cafa..9f54cd6b25 100644 --- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/StreamWriter.java +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/StreamWriter.java @@ -314,6 +314,16 @@ private ApiFuture appendInternal(AppendRowsRequest message) .withDescription("Connection is already closed"))); return requestWrapper.appendResult; } + if (this.inflightRequests + 1 >= this.maxInflightRequests + || this.inflightBytes + requestWrapper.messageSize >= this.maxInflightBytes) { + if (this.limitExceededBehavior == FlowController.LimitExceededBehavior.ThrowException) { + throw new StatusRuntimeException( + Status.fromCode(Code.RESOURCE_EXHAUSTED) + .withDescription( + "Exceeds client side inflight buffer, consider add more buffer or open more connections.")); + } + } + if (connectionFinalStatus != null) { requestWrapper.appendResult.setException( new StatusRuntimeException( @@ -339,29 +349,18 @@ private void maybeWaitForInflightQuota() { long start_time = System.currentTimeMillis(); while (this.inflightRequests >= this.maxInflightRequests || this.inflightBytes >= this.maxInflightBytes) { - if (this.limitExceededBehavior == FlowController.LimitExceededBehavior.ThrowException) { - throw new StatusRuntimeException( - Status.fromCode(Code.RESOURCE_EXHAUSTED) - .withDescription( - "Exceeds client side inflight buffer, consider add more buffer or open more connections.")); - } else if (this.limitExceededBehavior == FlowController.LimitExceededBehavior.Ignore) { + try { + inflightReduced.await(100, TimeUnit.MILLISECONDS); + } catch (InterruptedException e) { + log.warning( + "Interrupted while waiting for inflight quota. Stream: " + + streamName + + " Error: " + + e.toString()); throw new StatusRuntimeException( - Status.fromCode(Code.INVALID_ARGUMENT) - .withDescription("LimitExceededBehavior.Ignore is not supported on StreamWriter.")); - } else { - try { - inflightReduced.await(100, TimeUnit.MILLISECONDS); - } catch (InterruptedException e) { - log.warning( - "Interrupted while waiting for inflight quota. Stream: " - + streamName - + " Error: " - + e.toString()); - throw new StatusRuntimeException( - Status.fromCode(Code.CANCELLED) - .withCause(e) - .withDescription("Interrupted while waiting for quota.")); - } + Status.fromCode(Code.CANCELLED) + .withCause(e) + .withDescription("Interrupted while waiting for quota.")); } } inflightWaitSec.set((System.currentTimeMillis() - start_time) / 1000); @@ -812,7 +811,12 @@ public Builder setTraceId(String traceId) { * @return */ public Builder setLimitExceededBehavior( - FlowController.LimitExceededBehavior limitExceededBehavior) { + FlowController.LimitExceededBehavior limitExceededBehavior) throws StatusRuntimeException { + if (limitExceededBehavior == FlowController.LimitExceededBehavior.Ignore) { + throw new StatusRuntimeException( + Status.fromCode(Code.INVALID_ARGUMENT) + .withDescription("LimitExceededBehavior.Ignore is not supported on StreamWriter.")); + } this.limitExceededBehavior = limitExceededBehavior; return this; } diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/StreamWriterTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/StreamWriterTest.java index 4962a41a6a..9078e581bd 100644 --- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/StreamWriterTest.java +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/StreamWriterTest.java @@ -552,12 +552,6 @@ public void testAppendsWithTinyMaxInflightBytesThrow() throws Exception { .setMaxInflightBytes(1) .setLimitExceededBehavior(FlowController.LimitExceededBehavior.ThrowException) .build(); - // Server will sleep 100ms before every response. - testBigQueryWrite.setResponseSleep(Duration.ofMillis(100)); - long appendCount = 10; - for (int i = 0; i < appendCount; i++) { - testBigQueryWrite.addResponse(createAppendResponse(i)); - } StatusRuntimeException ex = assertThrows( StatusRuntimeException.class, @@ -577,6 +571,29 @@ public void run() throws Throwable { writer.close(); } + @Test + public void testLimitBehaviorIgnoreNotAccepted() throws Exception { + StatusRuntimeException ex = + assertThrows( + StatusRuntimeException.class, + new ThrowingRunnable() { + @Override + public void run() throws Throwable { + StreamWriter writer = + StreamWriter.newBuilder(TEST_STREAM, client) + .setWriterSchema(createProtoSchema()) + .setMaxInflightBytes(1) + .setLimitExceededBehavior(FlowController.LimitExceededBehavior.Ignore) + .build(); + } + }); + assertEquals(ex.getStatus().getCode(), Status.INVALID_ARGUMENT.getCode()); + assertTrue( + ex.getStatus() + .getDescription() + .contains("LimitExceededBehavior.Ignore is not supported on StreamWriter.")); + } + @Test public void testMessageTooLarge() throws Exception { StreamWriter writer = getTestStreamWriter(); From 0c42073b27f331a632791190e0aaab6cc2bfd440 Mon Sep 17 00:00:00 2001 From: yirutang Date: Fri, 6 May 2022 14:45:25 -0700 Subject: [PATCH 10/11] . --- .../java/com/google/cloud/bigquery/storage/v1/StreamWriter.java | 1 + 1 file changed, 1 insertion(+) diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/StreamWriter.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/StreamWriter.java index 9f54cd6b25..13255c50bc 100644 --- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/StreamWriter.java +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/StreamWriter.java @@ -314,6 +314,7 @@ private ApiFuture appendInternal(AppendRowsRequest message) .withDescription("Connection is already closed"))); return requestWrapper.appendResult; } + // Check if queue is going to be full before adding the request. if (this.inflightRequests + 1 >= this.maxInflightRequests || this.inflightBytes + requestWrapper.messageSize >= this.maxInflightBytes) { if (this.limitExceededBehavior == FlowController.LimitExceededBehavior.ThrowException) { From 0de5593222ea0ff28477bfdfb18140027beadd87 Mon Sep 17 00:00:00 2001 From: yirutang Date: Fri, 6 May 2022 15:43:58 -0700 Subject: [PATCH 11/11] . --- .../cloud/bigquery/storage/v1/StreamWriter.java | 15 +++++++-------- 1 file changed, 7 insertions(+), 8 deletions(-) diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/StreamWriter.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/StreamWriter.java index 13255c50bc..2c4ecdd75f 100644 --- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/StreamWriter.java +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/StreamWriter.java @@ -315,14 +315,13 @@ private ApiFuture appendInternal(AppendRowsRequest message) return requestWrapper.appendResult; } // Check if queue is going to be full before adding the request. - if (this.inflightRequests + 1 >= this.maxInflightRequests - || this.inflightBytes + requestWrapper.messageSize >= this.maxInflightBytes) { - if (this.limitExceededBehavior == FlowController.LimitExceededBehavior.ThrowException) { - throw new StatusRuntimeException( - Status.fromCode(Code.RESOURCE_EXHAUSTED) - .withDescription( - "Exceeds client side inflight buffer, consider add more buffer or open more connections.")); - } + if ((this.inflightRequests + 1 >= this.maxInflightRequests + || this.inflightBytes + requestWrapper.messageSize >= this.maxInflightBytes) + && (this.limitExceededBehavior == FlowController.LimitExceededBehavior.ThrowException)) { + throw new StatusRuntimeException( + Status.fromCode(Code.RESOURCE_EXHAUSTED) + .withDescription( + "Exceeds client side inflight buffer, consider add more buffer or open more connections.")); } if (connectionFinalStatus != null) {