Skip to content

Commit

Permalink
.
Browse files Browse the repository at this point in the history
  • Loading branch information
yirutang committed Sep 23, 2022
1 parent c87901b commit bd2640d
Show file tree
Hide file tree
Showing 3 changed files with 53 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import com.google.api.gax.core.CredentialsProvider;
import com.google.api.gax.rpc.TransportChannelProvider;
import com.google.cloud.bigquery.storage.v1.Exceptions.AppendSerializtionError;
import com.google.cloud.bigquery.storage.v1.StreamWriter.SingleConnectionOrConnectionPool.Kind;
import com.google.common.base.Preconditions;
import com.google.protobuf.Descriptors;
import com.google.protobuf.Descriptors.Descriptor;
Expand Down Expand Up @@ -77,6 +76,7 @@ private JsonStreamWriter(Builder builder)
streamWriterBuilder = StreamWriter.newBuilder(builder.streamName);
this.protoSchema = ProtoSchemaConverter.convert(this.descriptor);
this.totalMessageSize = protoSchema.getSerializedSize();
this.client = builder.client;
streamWriterBuilder.setWriterSchema(protoSchema);
setStreamWriterSettings(
builder.channelProvider,
Expand Down Expand Up @@ -132,21 +132,29 @@ private Message buildMessage(JSONObject json)
} catch (Exceptions.JsonDataHasUnknownFieldException ex) {
// Backend cache for GetWriteStream schema staleness can be 30 seconds, wait a bit before
// trying to get the table schema to increase the chance of succeed. This is to avoid
// client's invalid data caused storm of GetWriteStream.
// client's invalid datfa caused storm of GetWriteStream.
LOG.warning(
"Saw Json unknown field "
+ ex.getFieldName()
+ ", try to refresh the writer with updated schema");
Thread.sleep(35000);
GetWriteStreamRequest writeStreamRequest =
GetWriteStreamRequest.newBuilder()
.setName(this.streamName)
.setView(WriteStreamView.FULL)
.build();
WriteStream writeStream = client.getWriteStream(writeStreamRequest);
refreshWriter(writeStream.getTableSchema());
return JsonToProtoMessage.convertJsonToProtoMessage(
this.descriptor, this.tableSchema, json, ignoreUnknownFields);
try {
return JsonToProtoMessage.convertJsonToProtoMessage(
this.descriptor, this.tableSchema, json, ignoreUnknownFields);
} catch (Exceptions.JsonDataHasUnknownFieldException exex) {
LOG.warning("First attempt failed, waiting for 30 seconds to retry");
Thread.sleep(30000);
writeStream = client.getWriteStream(writeStreamRequest);
refreshWriter(writeStream.getTableSchema());
return JsonToProtoMessage.convertJsonToProtoMessage(
this.descriptor, this.tableSchema, json, ignoreUnknownFields);
}
}
}
/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,6 @@ public String toString() {
@Override
public void getWriteStream(
GetWriteStreamRequest request, StreamObserver<WriteStream> responseObserver) {
LOG.info("GetWriteStream called!");
Object response = writeResponses.remove();
if (response instanceof WriteStream) {
writeRequests.add(request);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -510,7 +510,7 @@ public void testSimpleSchemaUpdate() throws Exception {
}

@Test
public void testWithoutIgnoreUnknownFieldsUpdateSuccess() throws Exception {
public void testWithoutIgnoreUnknownFieldsUpdateImmeidateSuccess() throws Exception {
TableSchema tableSchema = TableSchema.newBuilder().addFields(0, TEST_INT).build();
TableSchema updatedSchema =
TableSchema.newBuilder()
Expand Down Expand Up @@ -544,10 +544,49 @@ public void testWithoutIgnoreUnknownFieldsUpdateSuccess() throws Exception {
}
}

@Test
public void testWithoutIgnoreUnknownFieldsUpdateSecondSuccess() throws Exception {
TableSchema tableSchema = TableSchema.newBuilder().addFields(0, TEST_INT).build();
TableSchema updatedSchema =
TableSchema.newBuilder()
.addFields(0, TEST_INT)
.addFields(
1,
TableFieldSchema.newBuilder()
.setName("test_string")
.setType(TableFieldSchema.Type.STRING)
.setMode(Mode.NULLABLE))
.build();
// GetWriteStream is called twice and got the updated schema
testBigQueryWrite.addResponse(
WriteStream.newBuilder().setName(TEST_STREAM).setTableSchema(tableSchema).build());
testBigQueryWrite.addResponse(
WriteStream.newBuilder().setName(TEST_STREAM).setTableSchema(updatedSchema).build());
testBigQueryWrite.addResponse(
AppendRowsResponse.newBuilder()
.setAppendResult(
AppendRowsResponse.AppendResult.newBuilder().setOffset(Int64Value.of(0)).build())
.build());
try (JsonStreamWriter writer =
getTestJsonStreamWriterBuilder(TEST_STREAM, tableSchema).build()) {
JSONObject foo = new JSONObject();
foo.put("test_int", 10);
JSONObject bar = new JSONObject();
bar.put("test_string", "a");
JSONArray jsonArr = new JSONArray();
jsonArr.put(foo);
jsonArr.put(bar);
ApiFuture<AppendRowsResponse> appendFuture = writer.append(jsonArr);
appendFuture.get();
}
}

@Test
public void testWithoutIgnoreUnknownFieldsUpdateFail() throws Exception {
TableSchema tableSchema = TableSchema.newBuilder().addFields(0, TEST_INT).build();
// GetWriteStream is called once but failed to update to the right schema.
testBigQueryWrite.addResponse(
WriteStream.newBuilder().setName(TEST_STREAM).setTableSchema(tableSchema).build());
testBigQueryWrite.addResponse(
WriteStream.newBuilder().setName(TEST_STREAM).setTableSchema(tableSchema).build());
try (JsonStreamWriter writer =
Expand Down

0 comments on commit bd2640d

Please sign in to comment.