Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: add extra JsonWriterTest to show that the LimitBehavior addition is not breaking #1643

Merged
merged 11 commits into from
May 8, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -78,10 +78,6 @@ private JsonStreamWriter(Builder builder)
this.protoSchema = ProtoSchemaConverter.convert(this.descriptor);
this.totalMessageSize = protoSchema.getSerializedSize();
streamWriterBuilder.setWriterSchema(protoSchema);
if (builder.flowControlSettings != null) {
GaoleMeng marked this conversation as resolved.
Show resolved Hide resolved
streamWriterBuilder.setLimitExceededBehavior(
builder.flowControlSettings.getLimitExceededBehavior());
}
setStreamWriterSettings(
builder.channelProvider,
builder.credentialsProvider,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -314,6 +314,16 @@ private ApiFuture<AppendRowsResponse> appendInternal(AppendRowsRequest message)
.withDescription("Connection is already closed")));
return requestWrapper.appendResult;
}
// Check if queue is going to be full before adding the request.
if ((this.inflightRequests + 1 >= this.maxInflightRequests
|| this.inflightBytes + requestWrapper.messageSize >= this.maxInflightBytes)
&& (this.limitExceededBehavior == FlowController.LimitExceededBehavior.ThrowException)) {
throw new StatusRuntimeException(
Status.fromCode(Code.RESOURCE_EXHAUSTED)
.withDescription(
"Exceeds client side inflight buffer, consider add more buffer or open more connections."));
}

if (connectionFinalStatus != null) {
requestWrapper.appendResult.setException(
new StatusRuntimeException(
Expand All @@ -339,29 +349,18 @@ private void maybeWaitForInflightQuota() {
long start_time = System.currentTimeMillis();
while (this.inflightRequests >= this.maxInflightRequests
|| this.inflightBytes >= this.maxInflightBytes) {
if (this.limitExceededBehavior == FlowController.LimitExceededBehavior.ThrowException) {
throw new StatusRuntimeException(
Status.fromCode(Code.RESOURCE_EXHAUSTED)
.withDescription(
"Exceeds client side inflight buffer, consider add more buffer or open more connections."));
} else if (this.limitExceededBehavior == FlowController.LimitExceededBehavior.Ignore) {
try {
inflightReduced.await(100, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
log.warning(
"Interrupted while waiting for inflight quota. Stream: "
+ streamName
+ " Error: "
+ e.toString());
throw new StatusRuntimeException(
Status.fromCode(Code.INVALID_ARGUMENT)
.withDescription("LimitExceededBehavior.Ignore is not supported on StreamWriter."));
} else {
try {
inflightReduced.await(100, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
log.warning(
"Interrupted while waiting for inflight quota. Stream: "
+ streamName
+ " Error: "
+ e.toString());
throw new StatusRuntimeException(
Status.fromCode(Code.CANCELLED)
.withCause(e)
.withDescription("Interrupted while waiting for quota."));
}
Status.fromCode(Code.CANCELLED)
.withCause(e)
.withDescription("Interrupted while waiting for quota."));
}
}
inflightWaitSec.set((System.currentTimeMillis() - start_time) / 1000);
Expand Down Expand Up @@ -812,7 +811,12 @@ public Builder setTraceId(String traceId) {
* @return
*/
public Builder setLimitExceededBehavior(
FlowController.LimitExceededBehavior limitExceededBehavior) {
FlowController.LimitExceededBehavior limitExceededBehavior) throws StatusRuntimeException {
if (limitExceededBehavior == FlowController.LimitExceededBehavior.Ignore) {
throw new StatusRuntimeException(
Status.fromCode(Code.INVALID_ARGUMENT)
.withDescription("LimitExceededBehavior.Ignore is not supported on StreamWriter."));
}
this.limitExceededBehavior = limitExceededBehavior;
return this;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -581,4 +581,26 @@ public void run() throws Throwable {
"Exceeds client side inflight buffer, consider add more buffer or open more connections"));
}
}

// This is to test the new addition didn't break previous settings, i.e., sets the inflight limit
// without limit beahvior.
@Test
public void testFlowControlSettingNoLimitBehavior() throws Exception {
yirutang marked this conversation as resolved.
Show resolved Hide resolved
TableSchema tableSchema = TableSchema.newBuilder().addFields(0, TEST_INT).build();
try (JsonStreamWriter writer =
JsonStreamWriter.newBuilder(TEST_STREAM, tableSchema)
.setChannelProvider(channelProvider)
.setCredentialsProvider(NoCredentialsProvider.create())
.setFlowControlSettings(
FlowControlSettings.newBuilder().setMaxOutstandingRequestBytes(1L).build())
.build()) {
testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().build());
JSONObject foo = new JSONObject();
foo.put("test_int", 10);
JSONArray jsonArr = new JSONArray();
jsonArr.put(foo);
ApiFuture<AppendRowsResponse> appendFuture = writer.append(jsonArr);
appendFuture.get();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -552,12 +552,6 @@ public void testAppendsWithTinyMaxInflightBytesThrow() throws Exception {
.setMaxInflightBytes(1)
.setLimitExceededBehavior(FlowController.LimitExceededBehavior.ThrowException)
.build();
// Server will sleep 100ms before every response.
testBigQueryWrite.setResponseSleep(Duration.ofMillis(100));
long appendCount = 10;
for (int i = 0; i < appendCount; i++) {
testBigQueryWrite.addResponse(createAppendResponse(i));
}
StatusRuntimeException ex =
assertThrows(
StatusRuntimeException.class,
Expand All @@ -577,6 +571,29 @@ public void run() throws Throwable {
writer.close();
}

@Test
public void testLimitBehaviorIgnoreNotAccepted() throws Exception {
StatusRuntimeException ex =
assertThrows(
StatusRuntimeException.class,
new ThrowingRunnable() {
@Override
public void run() throws Throwable {
StreamWriter writer =
StreamWriter.newBuilder(TEST_STREAM, client)
.setWriterSchema(createProtoSchema())
.setMaxInflightBytes(1)
.setLimitExceededBehavior(FlowController.LimitExceededBehavior.Ignore)
.build();
}
});
assertEquals(ex.getStatus().getCode(), Status.INVALID_ARGUMENT.getCode());
assertTrue(
ex.getStatus()
.getDescription()
.contains("LimitExceededBehavior.Ignore is not supported on StreamWriter."));
}

@Test
public void testMessageTooLarge() throws Exception {
StreamWriter writer = getTestStreamWriter();
Expand Down