From f3865b06ea7c61e95d3ee9bc7b46857d9d3080cc Mon Sep 17 00:00:00 2001
From: yayi-google <75696801+yayi-google@users.noreply.github.com>
Date: Mon, 1 Mar 2021 10:23:57 -0800
Subject: [PATCH] feat: in StreamWriterV2, supports new append, which takes
rows and offset (#894)
* feat: StreamWriterV2 will attach stream name in the first request in the connection and remove stream name and schema in the following ones.
* feat: StreamWriterV2 will attach stream name in the first request in the connection and remove stream name and schema in the following ones
* feat: In StreamWriterV2, add a new append method accepting rows and offset
* remove duplicated code
* add unit test for append rows without schema
---
.../storage/v1beta2/StreamWriterV2.java | 72 ++++++++++-
.../storage/v1beta2/StreamWriterV2Test.java | 120 +++++++++++++-----
2 files changed, 153 insertions(+), 39 deletions(-)
diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriterV2.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriterV2.java
index 6990f13173..afd492ae6b 100644
--- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriterV2.java
+++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriterV2.java
@@ -19,10 +19,12 @@
import com.google.api.core.SettableApiFuture;
import com.google.api.gax.core.CredentialsProvider;
import com.google.api.gax.rpc.TransportChannelProvider;
+import com.google.cloud.bigquery.storage.v1beta2.AppendRowsRequest.ProtoData;
import com.google.cloud.bigquery.storage.v1beta2.StreamConnection.DoneCallback;
import com.google.cloud.bigquery.storage.v1beta2.StreamConnection.RequestCallback;
import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.Uninterruptibles;
+import com.google.protobuf.Int64Value;
import io.grpc.Status;
import io.grpc.Status.Code;
import io.grpc.StatusRuntimeException;
@@ -39,8 +41,6 @@
/**
* A BigQuery Stream Writer that can be used to write data into BigQuery Table.
*
- *
TODO: Attach schema.
- *
*
TODO: Attach traceId.
*
*
TODO: Support batching.
@@ -59,6 +59,11 @@ public class StreamWriterV2 implements AutoCloseable {
*/
private final String streamName;
+ /*
+ * The proto schema of rows to write.
+ */
+ private final ProtoSchema writerSchema;
+
/*
* Max allowed inflight requests in the stream. Method append is blocked at this.
*/
@@ -135,6 +140,7 @@ private StreamWriterV2(Builder builder) throws IOException {
this.hasMessageInWaitingQueue = lock.newCondition();
this.inflightReduced = lock.newCondition();
this.streamName = builder.streamName;
+ this.writerSchema = builder.writerSchema;
this.maxInflightRequests = builder.maxInflightRequest;
this.maxInflightBytes = builder.maxInflightBytes;
this.waitingRequestQueue = new LinkedList();
@@ -188,10 +194,52 @@ public void run() {
* ApiFuture messageIdFuture = writer.append(message);
* ApiFutures.addCallback(messageIdFuture, new ApiFutureCallback() {
* public void onSuccess(AppendRowsResponse response) {
- * if (response.hasOffset()) {
- * System.out.println("written with offset: " + response.getOffset());
+ * if (!response.hasError()) {
+ * System.out.println("written with offset: " + response.getAppendResult().getOffset());
+ * } else {
+ * System.out.println("received an in stream error: " + response.getError().toString());
+ * }
+ * }
+ *
+ * public void onFailure(Throwable t) {
+ * System.out.println("failed to write: " + t);
+ * }
+ * }, MoreExecutors.directExecutor());
+ * }
+ *
+ * @param rows the rows in serialized format to write to BigQuery.
+ * @param offset the offset of the first row.
+ * @return the append response wrapped in a future.
+ */
+ public ApiFuture append(ProtoRows rows, long offset) {
+ // TODO: Move this check to builder after the other append is removed.
+ if (this.writerSchema == null) {
+ throw new StatusRuntimeException(
+ Status.fromCode(Code.INVALID_ARGUMENT)
+ .withDescription("Writer schema must be provided when building this writer."));
+ }
+ AppendRowsRequest.Builder requestBuilder = AppendRowsRequest.newBuilder();
+ requestBuilder.setProtoRows(ProtoData.newBuilder().setRows(rows).build());
+ if (offset >= 0) {
+ requestBuilder.setOffset(Int64Value.of(offset));
+ }
+ return append(requestBuilder.build());
+ }
+
+ /**
+ * Schedules the writing of a message.
+ *
+ * Example of writing a message.
+ *
+ *
{@code
+ * AppendRowsRequest message;
+ * ApiFuture messageIdFuture = writer.append(message);
+ * ApiFutures.addCallback(messageIdFuture, new ApiFutureCallback() {
+ * public void onSuccess(AppendRowsResponse response) {
+ * if (!response.hasError()) {
+ * System.out.println("written with offset: " + response.getAppendResult().getOffset());
* } else {
- * System.out.println("received an in stream error: " + response.error().toString());
+ * System.out.println("received an in stream error: " + response.getError().toString());
* }
* }
*
@@ -202,8 +250,9 @@ public void run() {
* }
*
* @param message the message in serialized format to write to BigQuery.
- * @return the message ID wrapped in a future.
+ * @return the append response wrapped in a future.
*/
+ @Deprecated
public ApiFuture append(AppendRowsRequest message) {
AppendRequestAndResponse requestWrapper = new AppendRequestAndResponse(message);
if (requestWrapper.messageSize > getApiMaxRequestBytes()) {
@@ -380,6 +429,9 @@ private AppendRowsRequest prepareRequestBasedOnPosition(
AppendRowsRequest original, boolean isFirstRequest) {
AppendRowsRequest.Builder requestBuilder = original.toBuilder();
if (isFirstRequest) {
+ if (this.writerSchema != null) {
+ requestBuilder.getProtoRowsBuilder().setWriterSchema(this.writerSchema);
+ }
requestBuilder.setWriteStream(this.streamName);
} else {
requestBuilder.clearWriteStream();
@@ -473,6 +525,8 @@ public static final class Builder {
private BigQueryWriteClient client;
+ private ProtoSchema writerSchema = null;
+
private long maxInflightRequest = DEFAULT_MAX_INFLIGHT_REQUESTS;
private long maxInflightBytes = DEFAULT_MAX_INFLIGHT_BYTES;
@@ -495,6 +549,12 @@ private Builder(String streamName, BigQueryWriteClient client) {
this.client = Preconditions.checkNotNull(client);
}
+ /** Sets the proto schema of the rows. */
+ public Builder setWriterSchema(ProtoSchema writerSchema) {
+ this.writerSchema = writerSchema;
+ return this;
+ }
+
public Builder setMaxInflightRequests(long value) {
this.maxInflightRequest = value;
return this;
diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriterV2Test.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriterV2Test.java
index f32487afb1..a6b4a1e790 100644
--- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriterV2Test.java
+++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriterV2Test.java
@@ -87,31 +87,39 @@ private StreamWriterV2 getTestStreamWriterV2() throws IOException {
return StreamWriterV2.newBuilder(TEST_STREAM, client).build();
}
- private AppendRowsRequest createAppendRequest(String[] messages, long offset) {
- AppendRowsRequest.Builder requestBuilder = AppendRowsRequest.newBuilder();
- AppendRowsRequest.ProtoData.Builder dataBuilder = AppendRowsRequest.ProtoData.newBuilder();
- dataBuilder.setWriterSchema(
- ProtoSchema.newBuilder()
- .setProtoDescriptor(
- DescriptorProtos.DescriptorProto.newBuilder()
- .setName("Message")
- .addField(
- DescriptorProtos.FieldDescriptorProto.newBuilder()
- .setName("foo")
- .setType(DescriptorProtos.FieldDescriptorProto.Type.TYPE_STRING)
- .setNumber(1)
- .build())
- .build()));
- ProtoRows.Builder rows = ProtoRows.newBuilder();
+ private ProtoSchema createProtoSchema() {
+ return ProtoSchema.newBuilder()
+ .setProtoDescriptor(
+ DescriptorProtos.DescriptorProto.newBuilder()
+ .setName("Message")
+ .addField(
+ DescriptorProtos.FieldDescriptorProto.newBuilder()
+ .setName("foo")
+ .setType(DescriptorProtos.FieldDescriptorProto.Type.TYPE_STRING)
+ .setNumber(1)
+ .build())
+ .build())
+ .build();
+ }
+
+ private ProtoRows createProtoRows(String[] messages) {
+ ProtoRows.Builder rowsBuilder = ProtoRows.newBuilder();
for (String message : messages) {
FooType foo = FooType.newBuilder().setFoo(message).build();
- rows.addSerializedRows(foo.toByteString());
+ rowsBuilder.addSerializedRows(foo.toByteString());
}
+ return rowsBuilder.build();
+ }
+
+ private AppendRowsRequest createAppendRequest(String[] messages, long offset) {
+ AppendRowsRequest.Builder requestBuilder = AppendRowsRequest.newBuilder();
+ AppendRowsRequest.ProtoData.Builder dataBuilder = AppendRowsRequest.ProtoData.newBuilder();
+ dataBuilder.setWriterSchema(createProtoSchema());
if (offset > 0) {
requestBuilder.setOffset(Int64Value.of(offset));
}
return requestBuilder
- .setProtoRows(dataBuilder.setRows(rows.build()).build())
+ .setProtoRows(dataBuilder.setRows(createProtoRows(messages)).build())
.setWriteStream(TEST_STREAM)
.build();
}
@@ -166,6 +174,24 @@ public void run() {
appendThread.interrupt();
}
+ private void verifyAppendRequests(long appendCount) {
+ assertEquals(appendCount, testBigQueryWrite.getAppendRequests().size());
+ for (int i = 0; i < appendCount; i++) {
+ AppendRowsRequest serverRequest = testBigQueryWrite.getAppendRequests().get(i);
+ assertTrue(serverRequest.getProtoRows().getRows().getSerializedRowsCount() > 0);
+ assertEquals(i, serverRequest.getOffset().getValue());
+ if (i == 0) {
+ // First request received by server should have schema and stream name.
+ assertTrue(serverRequest.getProtoRows().hasWriterSchema());
+ assertEquals(serverRequest.getWriteStream(), TEST_STREAM);
+ } else {
+ // Following request should not have schema and stream name.
+ assertFalse(serverRequest.getProtoRows().hasWriterSchema());
+ assertEquals(serverRequest.getWriteStream(), "");
+ }
+ }
+ }
+
@Test
public void testBuildBigQueryWriteClientInWriter() throws Exception {
StreamWriterV2 writer =
@@ -181,40 +207,68 @@ public void testBuildBigQueryWriteClientInWriter() throws Exception {
}
@Test
- public void testAppendSuccess() throws Exception {
- StreamWriterV2 writer = getTestStreamWriterV2();
+ public void testAppendWithRowsSuccess() throws Exception {
+ StreamWriterV2 writer =
+ StreamWriterV2.newBuilder(TEST_STREAM, client).setWriterSchema(createProtoSchema()).build();
- long appendCount = 1000;
+ long appendCount = 100;
for (int i = 0; i < appendCount; i++) {
testBigQueryWrite.addResponse(createAppendResponse(i));
}
List> futures = new ArrayList<>();
for (int i = 0; i < appendCount; i++) {
- futures.add(sendTestMessage(writer, new String[] {String.valueOf(i)}));
+ futures.add(writer.append(createProtoRows(new String[] {String.valueOf(i)}), i));
}
for (int i = 0; i < appendCount; i++) {
assertEquals(i, futures.get(i).get().getAppendResult().getOffset().getValue());
}
- assertEquals(appendCount, testBigQueryWrite.getAppendRequests().size());
+ verifyAppendRequests(appendCount);
+
+ writer.close();
+ }
+
+ @Test
+ public void testAppendWithMessageSuccess() throws Exception {
+ StreamWriterV2 writer = getTestStreamWriterV2();
+
+ long appendCount = 1000;
for (int i = 0; i < appendCount; i++) {
- AppendRowsRequest serverRequest = testBigQueryWrite.getAppendRequests().get(i);
- if (i == 0) {
- // First request received by server should have schema and stream name.
- assertTrue(serverRequest.getProtoRows().hasWriterSchema());
- assertEquals(serverRequest.getWriteStream(), TEST_STREAM);
- } else {
- // Following request should not have schema and stream name.
- assertFalse(serverRequest.getProtoRows().hasWriterSchema());
- assertEquals(serverRequest.getWriteStream(), "");
- }
+ testBigQueryWrite.addResponse(createAppendResponse(i));
}
+ List> futures = new ArrayList<>();
+ for (int i = 0; i < appendCount; i++) {
+ futures.add(writer.append(createAppendRequest(new String[] {String.valueOf(i)}, i)));
+ }
+
+ for (int i = 0; i < appendCount; i++) {
+ assertEquals(i, futures.get(i).get().getAppendResult().getOffset().getValue());
+ }
+
+ verifyAppendRequests(appendCount);
+
writer.close();
}
+ @Test
+ public void testAppendWithRowsNoSchema() throws Exception {
+ final StreamWriterV2 writer = getTestStreamWriterV2();
+ StatusRuntimeException ex =
+ assertThrows(
+ StatusRuntimeException.class,
+ new ThrowingRunnable() {
+ @Override
+ public void run() throws Throwable {
+ writer.append(createProtoRows(new String[] {"A"}), -1);
+ }
+ });
+ assertEquals(ex.getStatus().getCode(), Status.INVALID_ARGUMENT.getCode());
+ assertTrue(ex.getStatus().getDescription().contains("Writer schema must be provided"));
+ }
+
@Test
public void testAppendSuccessAndConnectionError() throws Exception {
StreamWriterV2 writer = getTestStreamWriterV2();