diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/JsonStreamWriter.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/JsonStreamWriter.java index c4463715f4..3186c282f4 100644 --- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/JsonStreamWriter.java +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/JsonStreamWriter.java @@ -78,10 +78,6 @@ private JsonStreamWriter(Builder builder) this.protoSchema = ProtoSchemaConverter.convert(this.descriptor); this.totalMessageSize = protoSchema.getSerializedSize(); streamWriterBuilder.setWriterSchema(protoSchema); - if (builder.flowControlSettings != null) { - streamWriterBuilder.setLimitExceededBehavior( - builder.flowControlSettings.getLimitExceededBehavior()); - } setStreamWriterSettings( builder.channelProvider, builder.credentialsProvider, diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/StreamWriter.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/StreamWriter.java index ef4bc8cafa..2c4ecdd75f 100644 --- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/StreamWriter.java +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/StreamWriter.java @@ -314,6 +314,16 @@ private ApiFuture appendInternal(AppendRowsRequest message) .withDescription("Connection is already closed"))); return requestWrapper.appendResult; } + // Check if queue is going to be full before adding the request. + if ((this.inflightRequests + 1 >= this.maxInflightRequests + || this.inflightBytes + requestWrapper.messageSize >= this.maxInflightBytes) + && (this.limitExceededBehavior == FlowController.LimitExceededBehavior.ThrowException)) { + throw new StatusRuntimeException( + Status.fromCode(Code.RESOURCE_EXHAUSTED) + .withDescription( + "Exceeds client side inflight buffer, consider add more buffer or open more connections.")); + } + if (connectionFinalStatus != null) { requestWrapper.appendResult.setException( new StatusRuntimeException( @@ -339,29 +349,18 @@ private void maybeWaitForInflightQuota() { long start_time = System.currentTimeMillis(); while (this.inflightRequests >= this.maxInflightRequests || this.inflightBytes >= this.maxInflightBytes) { - if (this.limitExceededBehavior == FlowController.LimitExceededBehavior.ThrowException) { - throw new StatusRuntimeException( - Status.fromCode(Code.RESOURCE_EXHAUSTED) - .withDescription( - "Exceeds client side inflight buffer, consider add more buffer or open more connections.")); - } else if (this.limitExceededBehavior == FlowController.LimitExceededBehavior.Ignore) { + try { + inflightReduced.await(100, TimeUnit.MILLISECONDS); + } catch (InterruptedException e) { + log.warning( + "Interrupted while waiting for inflight quota. Stream: " + + streamName + + " Error: " + + e.toString()); throw new StatusRuntimeException( - Status.fromCode(Code.INVALID_ARGUMENT) - .withDescription("LimitExceededBehavior.Ignore is not supported on StreamWriter.")); - } else { - try { - inflightReduced.await(100, TimeUnit.MILLISECONDS); - } catch (InterruptedException e) { - log.warning( - "Interrupted while waiting for inflight quota. Stream: " - + streamName - + " Error: " - + e.toString()); - throw new StatusRuntimeException( - Status.fromCode(Code.CANCELLED) - .withCause(e) - .withDescription("Interrupted while waiting for quota.")); - } + Status.fromCode(Code.CANCELLED) + .withCause(e) + .withDescription("Interrupted while waiting for quota.")); } } inflightWaitSec.set((System.currentTimeMillis() - start_time) / 1000); @@ -812,7 +811,12 @@ public Builder setTraceId(String traceId) { * @return */ public Builder setLimitExceededBehavior( - FlowController.LimitExceededBehavior limitExceededBehavior) { + FlowController.LimitExceededBehavior limitExceededBehavior) throws StatusRuntimeException { + if (limitExceededBehavior == FlowController.LimitExceededBehavior.Ignore) { + throw new StatusRuntimeException( + Status.fromCode(Code.INVALID_ARGUMENT) + .withDescription("LimitExceededBehavior.Ignore is not supported on StreamWriter.")); + } this.limitExceededBehavior = limitExceededBehavior; return this; } diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/JsonStreamWriterTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/JsonStreamWriterTest.java index 2a79b01d90..4a19f17e24 100644 --- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/JsonStreamWriterTest.java +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/JsonStreamWriterTest.java @@ -581,4 +581,26 @@ public void run() throws Throwable { "Exceeds client side inflight buffer, consider add more buffer or open more connections")); } } + + // This is to test the new addition didn't break previous settings, i.e., sets the inflight limit + // without limit beahvior. + @Test + public void testFlowControlSettingNoLimitBehavior() throws Exception { + TableSchema tableSchema = TableSchema.newBuilder().addFields(0, TEST_INT).build(); + try (JsonStreamWriter writer = + JsonStreamWriter.newBuilder(TEST_STREAM, tableSchema) + .setChannelProvider(channelProvider) + .setCredentialsProvider(NoCredentialsProvider.create()) + .setFlowControlSettings( + FlowControlSettings.newBuilder().setMaxOutstandingRequestBytes(1L).build()) + .build()) { + testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().build()); + JSONObject foo = new JSONObject(); + foo.put("test_int", 10); + JSONArray jsonArr = new JSONArray(); + jsonArr.put(foo); + ApiFuture appendFuture = writer.append(jsonArr); + appendFuture.get(); + } + } } diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/StreamWriterTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/StreamWriterTest.java index 4962a41a6a..9078e581bd 100644 --- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/StreamWriterTest.java +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/StreamWriterTest.java @@ -552,12 +552,6 @@ public void testAppendsWithTinyMaxInflightBytesThrow() throws Exception { .setMaxInflightBytes(1) .setLimitExceededBehavior(FlowController.LimitExceededBehavior.ThrowException) .build(); - // Server will sleep 100ms before every response. - testBigQueryWrite.setResponseSleep(Duration.ofMillis(100)); - long appendCount = 10; - for (int i = 0; i < appendCount; i++) { - testBigQueryWrite.addResponse(createAppendResponse(i)); - } StatusRuntimeException ex = assertThrows( StatusRuntimeException.class, @@ -577,6 +571,29 @@ public void run() throws Throwable { writer.close(); } + @Test + public void testLimitBehaviorIgnoreNotAccepted() throws Exception { + StatusRuntimeException ex = + assertThrows( + StatusRuntimeException.class, + new ThrowingRunnable() { + @Override + public void run() throws Throwable { + StreamWriter writer = + StreamWriter.newBuilder(TEST_STREAM, client) + .setWriterSchema(createProtoSchema()) + .setMaxInflightBytes(1) + .setLimitExceededBehavior(FlowController.LimitExceededBehavior.Ignore) + .build(); + } + }); + assertEquals(ex.getStatus().getCode(), Status.INVALID_ARGUMENT.getCode()); + assertTrue( + ex.getStatus() + .getDescription() + .contains("LimitExceededBehavior.Ignore is not supported on StreamWriter.")); + } + @Test public void testMessageTooLarge() throws Exception { StreamWriter writer = getTestStreamWriter();