diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriterV2.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriterV2.java
index 10ed99d137..8debea15f9 100644
--- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriterV2.java
+++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriterV2.java
@@ -22,6 +22,7 @@
import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.Uninterruptibles;
import io.grpc.Status;
+import io.grpc.Status.Code;
import io.grpc.StatusRuntimeException;
import java.util.Deque;
import java.util.LinkedList;
@@ -39,8 +40,6 @@
*
*
TODO: Attach schema.
*
- *
TODO: Add max size check.
- *
*
TODO: Add inflight control.
*
*
TODO: Attach traceId.
@@ -94,6 +93,11 @@ public class StreamWriterV2 implements AutoCloseable {
*/
private Thread appendThread;
+ /** The maximum size of one request. Defined by the API. */
+ public static long getApiMaxRequestBytes() {
+ return 8L * 1000L * 1000L; // 8 megabytes (https://en.wikipedia.org/wiki/Megabyte)
+ }
+
private StreamWriterV2(Builder builder) {
this.lock = new ReentrantLock();
this.hasMessageInWaitingQueue = lock.newCondition();
@@ -154,6 +158,17 @@ public void run() {
*/
public ApiFuture append(AppendRowsRequest message) {
AppendRequestAndResponse requestWrapper = new AppendRequestAndResponse(message);
+ if (requestWrapper.messageSize > getApiMaxRequestBytes()) {
+ requestWrapper.appendResult.setException(
+ new StatusRuntimeException(
+ Status.fromCode(Code.INVALID_ARGUMENT)
+ .withDescription(
+ "MessageSize is too large. Max allow: "
+ + getApiMaxRequestBytes()
+ + " Actual: "
+ + requestWrapper.messageSize)));
+ return requestWrapper.appendResult;
+ }
this.lock.lock();
try {
if (userClosed) {
@@ -355,10 +370,12 @@ public StreamWriterV2 build() {
private static final class AppendRequestAndResponse {
final SettableApiFuture appendResult;
final AppendRowsRequest message;
+ final long messageSize;
AppendRequestAndResponse(AppendRowsRequest message) {
this.appendResult = SettableApiFuture.create();
this.message = message;
+ this.messageSize = message.getProtoRows().getSerializedSize();
}
}
}
diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriterV2Test.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriterV2Test.java
index c50e5abb70..4d6fba9dcd 100644
--- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriterV2Test.java
+++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriterV2Test.java
@@ -26,6 +26,7 @@
import com.google.api.gax.rpc.ApiException;
import com.google.api.gax.rpc.StatusCode.Code;
import com.google.cloud.bigquery.storage.test.Test.FooType;
+import com.google.common.base.Strings;
import com.google.protobuf.DescriptorProtos;
import com.google.protobuf.Int64Value;
import io.grpc.Status;
@@ -288,6 +289,20 @@ public void serverCloseWhileRequestsInflight() throws Exception {
}
writer.close();
- ;
+ }
+
+ @Test
+ public void testMessageTooLarge() {
+ StreamWriterV2 writer = getTestStreamWriterV2();
+
+ String oversized = Strings.repeat("a", (int) (StreamWriterV2.getApiMaxRequestBytes() + 1));
+ ApiFuture appendFuture1 = sendTestMessage(writer, new String[] {oversized});
+ assertTrue(appendFuture1.isDone());
+ StatusRuntimeException actualError =
+ assertFutureException(StatusRuntimeException.class, appendFuture1);
+ assertEquals(Status.Code.INVALID_ARGUMENT, actualError.getStatus().getCode());
+ assertTrue(actualError.getStatus().getDescription().contains("MessageSize is too large"));
+
+ writer.close();
}
}