Skip to content

Commit

Permalink
Merge 8361971 into f7c645a
Browse files Browse the repository at this point in the history
  • Loading branch information
yayi-google authored Mar 8, 2021
2 parents f7c645a + 8361971 commit 27c5a78
Show file tree
Hide file tree
Showing 3 changed files with 42 additions and 103 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,11 @@ private StreamWriterV2(Builder builder) throws IOException {
this.hasMessageInWaitingQueue = lock.newCondition();
this.inflightReduced = lock.newCondition();
this.streamName = builder.streamName;
if (builder.writerSchema == null) {
throw new StatusRuntimeException(
Status.fromCode(Code.INVALID_ARGUMENT)
.withDescription("Writer schema must be provided when building this writer."));
}
this.writerSchema = builder.writerSchema;
this.maxInflightRequests = builder.maxInflightRequest;
this.maxInflightBytes = builder.maxInflightBytes;
Expand Down Expand Up @@ -216,48 +221,15 @@ public void run() {
* @return the append response wrapped in a future.
*/
public ApiFuture<AppendRowsResponse> append(ProtoRows rows, long offset) {
// TODO: Move this check to builder after the other append is removed.
if (this.writerSchema == null) {
throw new StatusRuntimeException(
Status.fromCode(Code.INVALID_ARGUMENT)
.withDescription("Writer schema must be provided when building this writer."));
}
AppendRowsRequest.Builder requestBuilder = AppendRowsRequest.newBuilder();
requestBuilder.setProtoRows(ProtoData.newBuilder().setRows(rows).build());
if (offset >= 0) {
requestBuilder.setOffset(Int64Value.of(offset));
}
return append(requestBuilder.build());
return appendInternal(requestBuilder.build());
}

/**
* Schedules the writing of a message.
*
* <p>Example of writing a message.
*
* <pre>{@code
* AppendRowsRequest message;
* ApiFuture<AppendRowsResponse> messageIdFuture = writer.append(message);
* ApiFutures.addCallback(messageIdFuture, new ApiFutureCallback<AppendRowsResponse>() {
* public void onSuccess(AppendRowsResponse response) {
* if (!response.hasError()) {
* System.out.println("written with offset: " + response.getAppendResult().getOffset());
* } else {
* System.out.println("received an in stream error: " + response.getError().toString());
* }
* }
*
* public void onFailure(Throwable t) {
* System.out.println("failed to write: " + t);
* }
* }, MoreExecutors.directExecutor());
* }</pre>
*
* @param message the message in serialized format to write to BigQuery.
* @return the append response wrapped in a future.
*/
@Deprecated
public ApiFuture<AppendRowsResponse> append(AppendRowsRequest message) {
private ApiFuture<AppendRowsResponse> appendInternal(AppendRowsRequest message) {
AppendRequestAndResponse requestWrapper = new AppendRequestAndResponse(message);
if (requestWrapper.messageSize > getApiMaxRequestBytes()) {
requestWrapper.appendResult.setException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,10 @@ public void tearDown() throws Exception {
}

private StreamWriterV2 getTestStreamWriterV2() throws IOException {
return StreamWriterV2.newBuilder(TEST_STREAM, client).setTraceId(TEST_TRACE_ID).build();
return StreamWriterV2.newBuilder(TEST_STREAM, client)
.setWriterSchema(createProtoSchema())
.setTraceId(TEST_TRACE_ID)
.build();
}

private ProtoSchema createProtoSchema() {
Expand All @@ -112,19 +115,6 @@ private ProtoRows createProtoRows(String[] messages) {
return rowsBuilder.build();
}

private AppendRowsRequest createAppendRequest(String[] messages, long offset) {
AppendRowsRequest.Builder requestBuilder = AppendRowsRequest.newBuilder();
AppendRowsRequest.ProtoData.Builder dataBuilder = AppendRowsRequest.ProtoData.newBuilder();
dataBuilder.setWriterSchema(createProtoSchema());
if (offset > 0) {
requestBuilder.setOffset(Int64Value.of(offset));
}
return requestBuilder
.setProtoRows(dataBuilder.setRows(createProtoRows(messages)).build())
.setWriteStream(TEST_STREAM)
.build();
}

private AppendRowsResponse createAppendResponse(long offset) {
return AppendRowsResponse.newBuilder()
.setAppendResult(
Expand All @@ -139,7 +129,7 @@ private AppendRowsResponse createAppendResponseWithError(Status.Code code, Strin
}

private ApiFuture<AppendRowsResponse> sendTestMessage(StreamWriterV2 writer, String[] messages) {
return writer.append(createAppendRequest(messages, -1));
return writer.append(createProtoRows(messages), -1);
}

private static <T extends Throwable> T assertFutureException(
Expand Down Expand Up @@ -201,6 +191,7 @@ public void testBuildBigQueryWriteClientInWriter() throws Exception {
StreamWriterV2.newBuilder(TEST_STREAM)
.setCredentialsProvider(NoCredentialsProvider.create())
.setChannelProvider(serviceHelper.createChannelProvider())
.setWriterSchema(createProtoSchema())
.build();

testBigQueryWrite.addResponse(createAppendResponse(0));
Expand All @@ -210,12 +201,8 @@ public void testBuildBigQueryWriteClientInWriter() throws Exception {
}

@Test
public void testAppendWithRowsSuccess() throws Exception {
StreamWriterV2 writer =
StreamWriterV2.newBuilder(TEST_STREAM, client)
.setWriterSchema(createProtoSchema())
.setTraceId(TEST_TRACE_ID)
.build();
public void testAppendSuccess() throws Exception {
StreamWriterV2 writer = getTestStreamWriterV2();

long appendCount = 100;
for (int i = 0; i < appendCount; i++) {
Expand All @@ -237,38 +224,14 @@ public void testAppendWithRowsSuccess() throws Exception {
}

@Test
public void testAppendWithMessageSuccess() throws Exception {
StreamWriterV2 writer = getTestStreamWriterV2();

long appendCount = 1000;
for (int i = 0; i < appendCount; i++) {
testBigQueryWrite.addResponse(createAppendResponse(i));
}

List<ApiFuture<AppendRowsResponse>> futures = new ArrayList<>();
for (int i = 0; i < appendCount; i++) {
futures.add(writer.append(createAppendRequest(new String[] {String.valueOf(i)}, i)));
}

for (int i = 0; i < appendCount; i++) {
assertEquals(i, futures.get(i).get().getAppendResult().getOffset().getValue());
}

verifyAppendRequests(appendCount);

writer.close();
}

@Test
public void testAppendWithRowsNoSchema() throws Exception {
final StreamWriterV2 writer = getTestStreamWriterV2();
public void testNoSchema() throws Exception {
StatusRuntimeException ex =
assertThrows(
StatusRuntimeException.class,
new ThrowingRunnable() {
@Override
public void run() throws Throwable {
writer.append(createProtoRows(new String[] {"A"}), -1);
StreamWriterV2.newBuilder(TEST_STREAM, client).build();
}
});
assertEquals(ex.getStatus().getCode(), Status.INVALID_ARGUMENT.getCode());
Expand Down Expand Up @@ -455,7 +418,10 @@ public void serverCloseWhileRequestsInflight() throws Exception {
@Test
public void testZeroMaxInflightRequests() throws Exception {
StreamWriterV2 writer =
StreamWriterV2.newBuilder(TEST_STREAM, client).setMaxInflightRequests(0).build();
StreamWriterV2.newBuilder(TEST_STREAM, client)
.setWriterSchema(createProtoSchema())
.setMaxInflightRequests(0)
.build();
testBigQueryWrite.addResponse(createAppendResponse(0));
verifyAppendIsBlocked(writer);
writer.close();
Expand All @@ -464,7 +430,10 @@ public void testZeroMaxInflightRequests() throws Exception {
@Test
public void testZeroMaxInflightBytes() throws Exception {
StreamWriterV2 writer =
StreamWriterV2.newBuilder(TEST_STREAM, client).setMaxInflightBytes(0).build();
StreamWriterV2.newBuilder(TEST_STREAM, client)
.setWriterSchema(createProtoSchema())
.setMaxInflightBytes(0)
.build();
testBigQueryWrite.addResponse(createAppendResponse(0));
verifyAppendIsBlocked(writer);
writer.close();
Expand All @@ -473,7 +442,10 @@ public void testZeroMaxInflightBytes() throws Exception {
@Test
public void testOneMaxInflightRequests() throws Exception {
StreamWriterV2 writer =
StreamWriterV2.newBuilder(TEST_STREAM, client).setMaxInflightRequests(1).build();
StreamWriterV2.newBuilder(TEST_STREAM, client)
.setWriterSchema(createProtoSchema())
.setMaxInflightRequests(1)
.build();
// Server will sleep 1 second before every response.
testBigQueryWrite.setResponseSleep(Duration.ofSeconds(1));
testBigQueryWrite.addResponse(createAppendResponse(0));
Expand All @@ -489,7 +461,10 @@ public void testOneMaxInflightRequests() throws Exception {
@Test
public void testAppendsWithTinyMaxInflightBytes() throws Exception {
StreamWriterV2 writer =
StreamWriterV2.newBuilder(TEST_STREAM, client).setMaxInflightBytes(1).build();
StreamWriterV2.newBuilder(TEST_STREAM, client)
.setWriterSchema(createProtoSchema())
.setMaxInflightBytes(1)
.build();
// Server will sleep 100ms before every response.
testBigQueryWrite.setResponseSleep(Duration.ofMillis(100));
long appendCount = 10;
Expand All @@ -500,7 +475,7 @@ public void testAppendsWithTinyMaxInflightBytes() throws Exception {
List<ApiFuture<AppendRowsResponse>> futures = new ArrayList<>();
long appendStartTimeMs = System.currentTimeMillis();
for (int i = 0; i < appendCount; i++) {
futures.add(writer.append(createAppendRequest(new String[] {String.valueOf(i)}, i)));
futures.add(writer.append(createProtoRows(new String[] {String.valueOf(i)}), i));
}
long appendElapsedMs = System.currentTimeMillis() - appendStartTimeMs;
assertTrue(appendElapsedMs >= 1000);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutureCallback;
import com.google.api.core.ApiFutures;
import com.google.cloud.bigquery.storage.v1beta2.AppendRowsRequest;
import com.google.cloud.bigquery.storage.v1beta2.AppendRowsResponse;
import com.google.cloud.bigquery.storage.v1beta2.BQTableSchemaToProtoDescriptor;
import com.google.cloud.bigquery.storage.v1beta2.BigQueryWriteClient;
Expand All @@ -35,7 +34,6 @@
import com.google.common.util.concurrent.MoreExecutors;
import com.google.protobuf.Descriptors.Descriptor;
import com.google.protobuf.Descriptors.DescriptorValidationException;
import com.google.protobuf.Int64Value;
import com.google.protobuf.Message;
import java.io.IOException;
import java.time.Duration;
Expand Down Expand Up @@ -163,16 +161,19 @@ private void writeToStream(
BQTableSchemaToProtoDescriptor.convertBQTableSchemaToProtoDescriptor(
writeStream.getTableSchema());
ProtoSchema protoSchema = ProtoSchemaConverter.convert(descriptor);
try (StreamWriterV2 writer = StreamWriterV2.newBuilder(writeStream.getName()).build()) {
try (StreamWriterV2 writer =
StreamWriterV2.newBuilder(writeStream.getName())
.setWriterSchema(protoSchema)
.setTraceId("SAMPLE:parallel_append")
.build()) {
while (System.currentTimeMillis() < deadlineMillis) {
synchronized (this) {
if (error != null) {
// Stop writing once we get an error.
throw error;
}
}
ApiFuture<AppendRowsResponse> future =
writer.append(createAppendRequest(writeStream.getName(), descriptor, protoSchema, -1));
ApiFuture<AppendRowsResponse> future = writer.append(createAppendRows(descriptor), -1);
synchronized (this) {
inflightCount++;
}
Expand All @@ -197,8 +198,7 @@ private void waitForInflightToReachZero(Duration timeout) {
throw new RuntimeException("Timeout waiting for inflight count to reach 0");
}

private AppendRowsRequest createAppendRequest(
String streamName, Descriptor descriptor, ProtoSchema protoSchema, long offset) {
private ProtoRows createAppendRows(Descriptor descriptor) {
ProtoRows.Builder rowsBuilder = ProtoRows.newBuilder();
for (int i = 0; i < BATCH_SIZE; i++) {
byte[] payload = new byte[ROW_SIZE];
Expand All @@ -208,15 +208,7 @@ private AppendRowsRequest createAppendRequest(
Message protoMessage = JsonToProtoMessage.convertJsonToProtoMessage(descriptor, record);
rowsBuilder.addSerializedRows(protoMessage.toByteString());
}
AppendRowsRequest.ProtoData.Builder data = AppendRowsRequest.ProtoData.newBuilder();
data.setWriterSchema(protoSchema);
data.setRows(rowsBuilder.build());
AppendRowsRequest.Builder request = AppendRowsRequest.newBuilder().setProtoRows(data.build());
request.setWriteStream(streamName);
if (offset >= 0) {
request.setOffset(Int64Value.of(offset));
}
return request.build();
return rowsBuilder.build();
}

private void sleepIgnoringInterruption(Duration duration) {
Expand Down

0 comments on commit 27c5a78

Please sign in to comment.