Skip to content

Commit

Permalink
feat: Add max size check to StreamWriterV2 (#873)
Browse files Browse the repository at this point in the history
* Add a new StreamWriterV2.

Compared to existing StreamWriter, its locking mechanism is much
simpler.

* Stop using Java8 features as we still need to support Java7

* Do not hold lock while sending requests, and some minor refactoring.

* Add max message size check.
  • Loading branch information
yayi-google authored Feb 23, 2021
1 parent de747ec commit 0261af4
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.Uninterruptibles;
import io.grpc.Status;
import io.grpc.Status.Code;
import io.grpc.StatusRuntimeException;
import java.util.Deque;
import java.util.LinkedList;
Expand All @@ -39,8 +40,6 @@
*
* <p>TODO: Attach schema.
*
* <p>TODO: Add max size check.
*
* <p>TODO: Add inflight control.
*
* <p>TODO: Attach traceId.
Expand Down Expand Up @@ -94,6 +93,11 @@ public class StreamWriterV2 implements AutoCloseable {
*/
private Thread appendThread;

/** The maximum size of one request. Defined by the API. */
public static long getApiMaxRequestBytes() {
return 8L * 1000L * 1000L; // 8 megabytes (https://en.wikipedia.org/wiki/Megabyte)
}

private StreamWriterV2(Builder builder) {
this.lock = new ReentrantLock();
this.hasMessageInWaitingQueue = lock.newCondition();
Expand Down Expand Up @@ -154,6 +158,17 @@ public void run() {
*/
public ApiFuture<AppendRowsResponse> append(AppendRowsRequest message) {
AppendRequestAndResponse requestWrapper = new AppendRequestAndResponse(message);
if (requestWrapper.messageSize > getApiMaxRequestBytes()) {
requestWrapper.appendResult.setException(
new StatusRuntimeException(
Status.fromCode(Code.INVALID_ARGUMENT)
.withDescription(
"MessageSize is too large. Max allow: "
+ getApiMaxRequestBytes()
+ " Actual: "
+ requestWrapper.messageSize)));
return requestWrapper.appendResult;
}
this.lock.lock();
try {
if (userClosed) {
Expand Down Expand Up @@ -355,10 +370,12 @@ public StreamWriterV2 build() {
private static final class AppendRequestAndResponse {
final SettableApiFuture<AppendRowsResponse> appendResult;
final AppendRowsRequest message;
final long messageSize;

AppendRequestAndResponse(AppendRowsRequest message) {
this.appendResult = SettableApiFuture.create();
this.message = message;
this.messageSize = message.getProtoRows().getSerializedSize();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import com.google.api.gax.rpc.ApiException;
import com.google.api.gax.rpc.StatusCode.Code;
import com.google.cloud.bigquery.storage.test.Test.FooType;
import com.google.common.base.Strings;
import com.google.protobuf.DescriptorProtos;
import com.google.protobuf.Int64Value;
import io.grpc.Status;
Expand Down Expand Up @@ -288,6 +289,20 @@ public void serverCloseWhileRequestsInflight() throws Exception {
}

writer.close();
;
}

@Test
public void testMessageTooLarge() {
StreamWriterV2 writer = getTestStreamWriterV2();

String oversized = Strings.repeat("a", (int) (StreamWriterV2.getApiMaxRequestBytes() + 1));
ApiFuture<AppendRowsResponse> appendFuture1 = sendTestMessage(writer, new String[] {oversized});
assertTrue(appendFuture1.isDone());
StatusRuntimeException actualError =
assertFutureException(StatusRuntimeException.class, appendFuture1);
assertEquals(Status.Code.INVALID_ARGUMENT, actualError.getStatus().getCode());
assertTrue(actualError.getStatus().getDescription().contains("MessageSize is too large"));

writer.close();
}
}

0 comments on commit 0261af4

Please sign in to comment.