Skip to content

Commit

Permalink
feat: in StreamWriterV2, supports new append, which takes rows and of…
Browse files Browse the repository at this point in the history
…fset (#894)

* feat: StreamWriterV2 will attach stream name in the first request in the connection and remove stream name and schema in the following ones.

* feat: StreamWriterV2 will attach stream name in the first request in the connection and remove stream name and schema in the following ones

* feat: In StreamWriterV2, add a new append method accepting rows and offset

* remove duplicated code

* add unit test for append rows without schema
  • Loading branch information
yayi-google authored Mar 1, 2021
1 parent 74d20b2 commit f3865b0
Show file tree
Hide file tree
Showing 2 changed files with 153 additions and 39 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,12 @@
import com.google.api.core.SettableApiFuture;
import com.google.api.gax.core.CredentialsProvider;
import com.google.api.gax.rpc.TransportChannelProvider;
import com.google.cloud.bigquery.storage.v1beta2.AppendRowsRequest.ProtoData;
import com.google.cloud.bigquery.storage.v1beta2.StreamConnection.DoneCallback;
import com.google.cloud.bigquery.storage.v1beta2.StreamConnection.RequestCallback;
import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.Uninterruptibles;
import com.google.protobuf.Int64Value;
import io.grpc.Status;
import io.grpc.Status.Code;
import io.grpc.StatusRuntimeException;
Expand All @@ -39,8 +41,6 @@
/**
* A BigQuery Stream Writer that can be used to write data into BigQuery Table.
*
* <p>TODO: Attach schema.
*
* <p>TODO: Attach traceId.
*
* <p>TODO: Support batching.
Expand All @@ -59,6 +59,11 @@ public class StreamWriterV2 implements AutoCloseable {
*/
private final String streamName;

/*
* The proto schema of rows to write.
*/
private final ProtoSchema writerSchema;

/*
* Max allowed inflight requests in the stream. Method append is blocked at this.
*/
Expand Down Expand Up @@ -135,6 +140,7 @@ private StreamWriterV2(Builder builder) throws IOException {
this.hasMessageInWaitingQueue = lock.newCondition();
this.inflightReduced = lock.newCondition();
this.streamName = builder.streamName;
this.writerSchema = builder.writerSchema;
this.maxInflightRequests = builder.maxInflightRequest;
this.maxInflightBytes = builder.maxInflightBytes;
this.waitingRequestQueue = new LinkedList<AppendRequestAndResponse>();
Expand Down Expand Up @@ -188,10 +194,52 @@ public void run() {
* ApiFuture<AppendRowsResponse> messageIdFuture = writer.append(message);
* ApiFutures.addCallback(messageIdFuture, new ApiFutureCallback<AppendRowsResponse>() {
* public void onSuccess(AppendRowsResponse response) {
* if (response.hasOffset()) {
* System.out.println("written with offset: " + response.getOffset());
* if (!response.hasError()) {
* System.out.println("written with offset: " + response.getAppendResult().getOffset());
* } else {
* System.out.println("received an in stream error: " + response.getError().toString());
* }
* }
*
* public void onFailure(Throwable t) {
* System.out.println("failed to write: " + t);
* }
* }, MoreExecutors.directExecutor());
* }</pre>
*
* @param rows the rows in serialized format to write to BigQuery.
* @param offset the offset of the first row.
* @return the append response wrapped in a future.
*/
public ApiFuture<AppendRowsResponse> append(ProtoRows rows, long offset) {
// TODO: Move this check to builder after the other append is removed.
if (this.writerSchema == null) {
throw new StatusRuntimeException(
Status.fromCode(Code.INVALID_ARGUMENT)
.withDescription("Writer schema must be provided when building this writer."));
}
AppendRowsRequest.Builder requestBuilder = AppendRowsRequest.newBuilder();
requestBuilder.setProtoRows(ProtoData.newBuilder().setRows(rows).build());
if (offset >= 0) {
requestBuilder.setOffset(Int64Value.of(offset));
}
return append(requestBuilder.build());
}

/**
* Schedules the writing of a message.
*
* <p>Example of writing a message.
*
* <pre>{@code
* AppendRowsRequest message;
* ApiFuture<AppendRowsResponse> messageIdFuture = writer.append(message);
* ApiFutures.addCallback(messageIdFuture, new ApiFutureCallback<AppendRowsResponse>() {
* public void onSuccess(AppendRowsResponse response) {
* if (!response.hasError()) {
* System.out.println("written with offset: " + response.getAppendResult().getOffset());
* } else {
* System.out.println("received an in stream error: " + response.error().toString());
* System.out.println("received an in stream error: " + response.getError().toString());
* }
* }
*
Expand All @@ -202,8 +250,9 @@ public void run() {
* }</pre>
*
* @param message the message in serialized format to write to BigQuery.
* @return the message ID wrapped in a future.
* @return the append response wrapped in a future.
*/
@Deprecated
public ApiFuture<AppendRowsResponse> append(AppendRowsRequest message) {
AppendRequestAndResponse requestWrapper = new AppendRequestAndResponse(message);
if (requestWrapper.messageSize > getApiMaxRequestBytes()) {
Expand Down Expand Up @@ -380,6 +429,9 @@ private AppendRowsRequest prepareRequestBasedOnPosition(
AppendRowsRequest original, boolean isFirstRequest) {
AppendRowsRequest.Builder requestBuilder = original.toBuilder();
if (isFirstRequest) {
if (this.writerSchema != null) {
requestBuilder.getProtoRowsBuilder().setWriterSchema(this.writerSchema);
}
requestBuilder.setWriteStream(this.streamName);
} else {
requestBuilder.clearWriteStream();
Expand Down Expand Up @@ -473,6 +525,8 @@ public static final class Builder {

private BigQueryWriteClient client;

private ProtoSchema writerSchema = null;

private long maxInflightRequest = DEFAULT_MAX_INFLIGHT_REQUESTS;

private long maxInflightBytes = DEFAULT_MAX_INFLIGHT_BYTES;
Expand All @@ -495,6 +549,12 @@ private Builder(String streamName, BigQueryWriteClient client) {
this.client = Preconditions.checkNotNull(client);
}

/** Sets the proto schema of the rows. */
public Builder setWriterSchema(ProtoSchema writerSchema) {
this.writerSchema = writerSchema;
return this;
}

public Builder setMaxInflightRequests(long value) {
this.maxInflightRequest = value;
return this;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,31 +87,39 @@ private StreamWriterV2 getTestStreamWriterV2() throws IOException {
return StreamWriterV2.newBuilder(TEST_STREAM, client).build();
}

private AppendRowsRequest createAppendRequest(String[] messages, long offset) {
AppendRowsRequest.Builder requestBuilder = AppendRowsRequest.newBuilder();
AppendRowsRequest.ProtoData.Builder dataBuilder = AppendRowsRequest.ProtoData.newBuilder();
dataBuilder.setWriterSchema(
ProtoSchema.newBuilder()
.setProtoDescriptor(
DescriptorProtos.DescriptorProto.newBuilder()
.setName("Message")
.addField(
DescriptorProtos.FieldDescriptorProto.newBuilder()
.setName("foo")
.setType(DescriptorProtos.FieldDescriptorProto.Type.TYPE_STRING)
.setNumber(1)
.build())
.build()));
ProtoRows.Builder rows = ProtoRows.newBuilder();
private ProtoSchema createProtoSchema() {
return ProtoSchema.newBuilder()
.setProtoDescriptor(
DescriptorProtos.DescriptorProto.newBuilder()
.setName("Message")
.addField(
DescriptorProtos.FieldDescriptorProto.newBuilder()
.setName("foo")
.setType(DescriptorProtos.FieldDescriptorProto.Type.TYPE_STRING)
.setNumber(1)
.build())
.build())
.build();
}

private ProtoRows createProtoRows(String[] messages) {
ProtoRows.Builder rowsBuilder = ProtoRows.newBuilder();
for (String message : messages) {
FooType foo = FooType.newBuilder().setFoo(message).build();
rows.addSerializedRows(foo.toByteString());
rowsBuilder.addSerializedRows(foo.toByteString());
}
return rowsBuilder.build();
}

private AppendRowsRequest createAppendRequest(String[] messages, long offset) {
AppendRowsRequest.Builder requestBuilder = AppendRowsRequest.newBuilder();
AppendRowsRequest.ProtoData.Builder dataBuilder = AppendRowsRequest.ProtoData.newBuilder();
dataBuilder.setWriterSchema(createProtoSchema());
if (offset > 0) {
requestBuilder.setOffset(Int64Value.of(offset));
}
return requestBuilder
.setProtoRows(dataBuilder.setRows(rows.build()).build())
.setProtoRows(dataBuilder.setRows(createProtoRows(messages)).build())
.setWriteStream(TEST_STREAM)
.build();
}
Expand Down Expand Up @@ -166,6 +174,24 @@ public void run() {
appendThread.interrupt();
}

private void verifyAppendRequests(long appendCount) {
assertEquals(appendCount, testBigQueryWrite.getAppendRequests().size());
for (int i = 0; i < appendCount; i++) {
AppendRowsRequest serverRequest = testBigQueryWrite.getAppendRequests().get(i);
assertTrue(serverRequest.getProtoRows().getRows().getSerializedRowsCount() > 0);
assertEquals(i, serverRequest.getOffset().getValue());
if (i == 0) {
// First request received by server should have schema and stream name.
assertTrue(serverRequest.getProtoRows().hasWriterSchema());
assertEquals(serverRequest.getWriteStream(), TEST_STREAM);
} else {
// Following request should not have schema and stream name.
assertFalse(serverRequest.getProtoRows().hasWriterSchema());
assertEquals(serverRequest.getWriteStream(), "");
}
}
}

@Test
public void testBuildBigQueryWriteClientInWriter() throws Exception {
StreamWriterV2 writer =
Expand All @@ -181,40 +207,68 @@ public void testBuildBigQueryWriteClientInWriter() throws Exception {
}

@Test
public void testAppendSuccess() throws Exception {
StreamWriterV2 writer = getTestStreamWriterV2();
public void testAppendWithRowsSuccess() throws Exception {
StreamWriterV2 writer =
StreamWriterV2.newBuilder(TEST_STREAM, client).setWriterSchema(createProtoSchema()).build();

long appendCount = 1000;
long appendCount = 100;
for (int i = 0; i < appendCount; i++) {
testBigQueryWrite.addResponse(createAppendResponse(i));
}

List<ApiFuture<AppendRowsResponse>> futures = new ArrayList<>();
for (int i = 0; i < appendCount; i++) {
futures.add(sendTestMessage(writer, new String[] {String.valueOf(i)}));
futures.add(writer.append(createProtoRows(new String[] {String.valueOf(i)}), i));
}

for (int i = 0; i < appendCount; i++) {
assertEquals(i, futures.get(i).get().getAppendResult().getOffset().getValue());
}
assertEquals(appendCount, testBigQueryWrite.getAppendRequests().size());

verifyAppendRequests(appendCount);

writer.close();
}

@Test
public void testAppendWithMessageSuccess() throws Exception {
StreamWriterV2 writer = getTestStreamWriterV2();

long appendCount = 1000;
for (int i = 0; i < appendCount; i++) {
AppendRowsRequest serverRequest = testBigQueryWrite.getAppendRequests().get(i);
if (i == 0) {
// First request received by server should have schema and stream name.
assertTrue(serverRequest.getProtoRows().hasWriterSchema());
assertEquals(serverRequest.getWriteStream(), TEST_STREAM);
} else {
// Following request should not have schema and stream name.
assertFalse(serverRequest.getProtoRows().hasWriterSchema());
assertEquals(serverRequest.getWriteStream(), "");
}
testBigQueryWrite.addResponse(createAppendResponse(i));
}

List<ApiFuture<AppendRowsResponse>> futures = new ArrayList<>();
for (int i = 0; i < appendCount; i++) {
futures.add(writer.append(createAppendRequest(new String[] {String.valueOf(i)}, i)));
}

for (int i = 0; i < appendCount; i++) {
assertEquals(i, futures.get(i).get().getAppendResult().getOffset().getValue());
}

verifyAppendRequests(appendCount);

writer.close();
}

@Test
public void testAppendWithRowsNoSchema() throws Exception {
final StreamWriterV2 writer = getTestStreamWriterV2();
StatusRuntimeException ex =
assertThrows(
StatusRuntimeException.class,
new ThrowingRunnable() {
@Override
public void run() throws Throwable {
writer.append(createProtoRows(new String[] {"A"}), -1);
}
});
assertEquals(ex.getStatus().getCode(), Status.INVALID_ARGUMENT.getCode());
assertTrue(ex.getStatus().getDescription().contains("Writer schema must be provided"));
}

@Test
public void testAppendSuccessAndConnectionError() throws Exception {
StreamWriterV2 writer = getTestStreamWriterV2();
Expand Down

0 comments on commit f3865b0

Please sign in to comment.