Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: client unknown fields drives writer refreshment #1797

Merged
merged 12 commits into from
Sep 26, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ public class JsonStreamWriter implements AutoCloseable {
"projects/[^/]+/datasets/[^/]+/tables/[^/]+/streams/[^/]+";
private static Pattern streamPattern = Pattern.compile(streamPatternString);
private static final Logger LOG = Logger.getLogger(JsonStreamWriter.class.getName());
private static final long UPDATE_SCHEMA_RETRY_INTERVAL_MILLIS = 30100L;

private BigQueryWriteClient client;
private String streamName;
Expand Down Expand Up @@ -77,6 +78,7 @@ private JsonStreamWriter(Builder builder)
streamWriterBuilder = StreamWriter.newBuilder(builder.streamName);
this.protoSchema = ProtoSchemaConverter.convert(this.descriptor);
this.totalMessageSize = protoSchema.getSerializedSize();
this.client = builder.client;
streamWriterBuilder.setWriterSchema(protoSchema);
setStreamWriterSettings(
builder.channelProvider,
Expand Down Expand Up @@ -108,6 +110,60 @@ public ApiFuture<AppendRowsResponse> append(JSONArray jsonArr)
return append(jsonArr, -1);
}

private void refreshWriter(TableSchema updatedSchema)
throws DescriptorValidationException, IOException {
Preconditions.checkNotNull(updatedSchema, "updatedSchema is null.");
LOG.info("Refresh internal writer due to schema update, stream: " + this.streamName);
// Close the StreamWriterf
this.streamWriter.close();
// Update JsonStreamWriter's TableSchema and Descriptor
this.tableSchema = updatedSchema;
this.descriptor =
BQTableSchemaToProtoDescriptor.convertBQTableSchemaToProtoDescriptor(updatedSchema);
this.protoSchema = ProtoSchemaConverter.convert(this.descriptor);
this.totalMessageSize = protoSchema.getSerializedSize();
// Create a new underlying StreamWriter with the updated TableSchema and Descriptor
this.streamWriter = streamWriterBuilder.setWriterSchema(this.protoSchema).build();
}

private Message buildMessage(JSONObject json)
throws InterruptedException, DescriptorValidationException, IOException {
try {
GaoleMeng marked this conversation as resolved.
Show resolved Hide resolved
return JsonToProtoMessage.convertJsonToProtoMessage(
this.descriptor, this.tableSchema, json, ignoreUnknownFields);
} catch (Exceptions.JsonDataHasUnknownFieldException ex) {
// Backend cache for GetWriteStream schema staleness can be 30 seconds, wait a bit before
// trying to get the table schema to increase the chance of succeed. This is to avoid
// client's invalid datfa caused storm of GetWriteStream.
LOG.warning(
"Saw Json unknown field "
+ ex.getFieldName()
+ ", try to refresh the writer with updated schema, stream: "
+ streamName);
GetWriteStreamRequest writeStreamRequest =
yirutang marked this conversation as resolved.
Show resolved Hide resolved
GetWriteStreamRequest.newBuilder()
.setName(this.streamName)
.setView(WriteStreamView.FULL)
.build();
WriteStream writeStream = client.getWriteStream(writeStreamRequest);
refreshWriter(writeStream.getTableSchema());
try {
return JsonToProtoMessage.convertJsonToProtoMessage(
this.descriptor, this.tableSchema, json, ignoreUnknownFields);
} catch (Exceptions.JsonDataHasUnknownFieldException exex) {
LOG.warning(
"First attempt failed, waiting for 30 seconds to retry, stream: " + this.streamName);
Thread.sleep(UPDATE_SCHEMA_RETRY_INTERVAL_MILLIS);
writeStream = client.getWriteStream(writeStreamRequest);
// TODO(yiru): We should let TableSchema return a timestamp so that we can simply
// compare the timestamp to see if the table schema is the same. If it is the
// same, we don't need to go refresh the writer again.
refreshWriter(writeStream.getTableSchema());
return JsonToProtoMessage.convertJsonToProtoMessage(
this.descriptor, this.tableSchema, json, ignoreUnknownFields);
}
}
}
/**
* Writes a JSONArray that contains JSONObjects to the BigQuery table by first converting the JSON
* data to protobuf messages, then using StreamWriter's append() to write the data at the
Expand All @@ -126,17 +182,7 @@ public ApiFuture<AppendRowsResponse> append(JSONArray jsonArr, long offset)
// Update schema only work when connection pool is not enabled.
if (this.streamWriter.getConnectionOperationType() == Kind.CONNECTION_WORKER
&& this.streamWriter.getUpdatedSchema() != null) {
TableSchema updatedSchema = this.streamWriter.getUpdatedSchema();
// Close the StreamWriter
this.streamWriter.close();
// Update JsonStreamWriter's TableSchema and Descriptor
this.tableSchema = updatedSchema;
this.descriptor =
BQTableSchemaToProtoDescriptor.convertBQTableSchemaToProtoDescriptor(updatedSchema);
this.protoSchema = ProtoSchemaConverter.convert(this.descriptor);
this.totalMessageSize = protoSchema.getSerializedSize();
// Create a new underlying StreamWriter with the updated TableSchema and Descriptor
this.streamWriter = streamWriterBuilder.setWriterSchema(this.protoSchema).build();
refreshWriter(this.streamWriter.getUpdatedSchema());
}

ProtoRows.Builder rowsBuilder = ProtoRows.newBuilder();
Expand All @@ -150,9 +196,7 @@ public ApiFuture<AppendRowsResponse> append(JSONArray jsonArr, long offset)
for (int i = 0; i < jsonArr.length(); i++) {
JSONObject json = jsonArr.getJSONObject(i);
try {
Message protoMessage =
JsonToProtoMessage.convertJsonToProtoMessage(
this.descriptor, this.tableSchema, json, ignoreUnknownFields);
Message protoMessage = buildMessage(json);
rowsBuilder.addSerializedRows(protoMessage.toByteString());
currentRequestSize += protoMessage.getSerializedSize();
} catch (IllegalArgumentException exception) {
Expand All @@ -169,6 +213,8 @@ public ApiFuture<AppendRowsResponse> append(JSONArray jsonArr, long offset)
} else {
rowIndexToErrorMessage.put(i, exception.getMessage());
}
} catch (InterruptedException ex) {
throw new RuntimeException(ex);
yirutang marked this conversation as resolved.
Show resolved Hide resolved
}
}

Expand Down Expand Up @@ -277,7 +323,7 @@ public static Builder newBuilder(String streamOrTableName, TableSchema tableSche
*/
public static Builder newBuilder(
String streamOrTableName, TableSchema tableSchema, BigQueryWriteClient client) {
Preconditions.checkNotNull(streamOrTableName, "StreamName is null.");
Preconditions.checkNotNull(streamOrTableName, "StreamOrTableName is null.");
Preconditions.checkNotNull(tableSchema, "TableSchema is null.");
Preconditions.checkNotNull(client, "BigQuery client is null.");
return new Builder(streamOrTableName, tableSchema, client);
Expand Down Expand Up @@ -359,6 +405,7 @@ private Builder(String streamOrTableName, TableSchema tableSchema, BigQueryWrite

WriteStream writeStream = this.client.getWriteStream(writeStreamRequest);
TableSchema writeStreamTableSchema = writeStream.getTableSchema();

this.tableSchema = writeStreamTableSchema;
} else {
this.tableSchema = tableSchema;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import com.google.cloud.bigquery.storage.test.Test.FooType;
import com.google.cloud.bigquery.storage.test.Test.UpdatedFooType;
import com.google.cloud.bigquery.storage.v1.Exceptions.AppendSerializtionError;
import com.google.cloud.bigquery.storage.v1.TableFieldSchema.Mode;
import com.google.protobuf.Descriptors.DescriptorValidationException;
import com.google.protobuf.Int64Value;
import com.google.protobuf.Timestamp;
Expand Down Expand Up @@ -67,6 +68,7 @@ public class JsonStreamWriterTest {
private FakeScheduledExecutorService fakeExecutor;
private FakeBigQueryWrite testBigQueryWrite;
private static MockServiceHelper serviceHelper;
private BigQueryWriteClient client;

private final TableFieldSchema FOO =
TableFieldSchema.newBuilder()
Expand Down Expand Up @@ -116,14 +118,15 @@ public void setUp() throws Exception {
channelProvider = serviceHelper.createChannelProvider();
fakeExecutor = new FakeScheduledExecutorService();
testBigQueryWrite.setExecutor(fakeExecutor);
BigQueryWriteSettings settings =
BigQueryWriteSettings.newBuilder()
.setTransportChannelProvider(channelProvider)
.setCredentialsProvider(NoCredentialsProvider.create())
.build();
client = BigQueryWriteClient.create(settings);
Instant time = Instant.now();
Timestamp timestamp =
Timestamp.newBuilder().setSeconds(time.getEpochSecond()).setNanos(time.getNano()).build();
// Add enough GetWriteStream response.
for (int i = 0; i < 4; i++) {
testBigQueryWrite.addResponse(
WriteStream.newBuilder().setName(TEST_STREAM).setCreateTime(timestamp).build());
}
}

@After
Expand All @@ -133,7 +136,7 @@ public void tearDown() throws Exception {

private JsonStreamWriter.Builder getTestJsonStreamWriterBuilder(
String testStream, TableSchema BQTableSchema) {
return JsonStreamWriter.newBuilder(testStream, BQTableSchema)
return JsonStreamWriter.newBuilder(testStream, BQTableSchema, client)
.setChannelProvider(channelProvider)
.setCredentialsProvider(NoCredentialsProvider.create());
}
Expand Down Expand Up @@ -507,8 +510,85 @@ public void testSimpleSchemaUpdate() throws Exception {
}

@Test
public void testWithoutIgnoreUnknownFields() throws Exception {
public void testWithoutIgnoreUnknownFieldsUpdateImmeidateSuccess() throws Exception {
TableSchema tableSchema = TableSchema.newBuilder().addFields(0, TEST_INT).build();
TableSchema updatedSchema =
TableSchema.newBuilder()
.addFields(0, TEST_INT)
.addFields(
1,
TableFieldSchema.newBuilder()
.setName("test_string")
.setType(TableFieldSchema.Type.STRING)
.setMode(Mode.NULLABLE))
.build();
// GetWriteStream is called once and the writer is fixed to accept unknown fields.
testBigQueryWrite.addResponse(
WriteStream.newBuilder().setName(TEST_STREAM).setTableSchema(updatedSchema).build());
testBigQueryWrite.addResponse(
AppendRowsResponse.newBuilder()
.setAppendResult(
AppendRowsResponse.AppendResult.newBuilder().setOffset(Int64Value.of(0)).build())
.build());
try (JsonStreamWriter writer =
getTestJsonStreamWriterBuilder(TEST_STREAM, tableSchema).build()) {
JSONObject foo = new JSONObject();
foo.put("test_int", 10);
JSONObject bar = new JSONObject();
bar.put("test_string", "a");
JSONArray jsonArr = new JSONArray();
jsonArr.put(foo);
jsonArr.put(bar);
ApiFuture<AppendRowsResponse> appendFuture = writer.append(jsonArr);
appendFuture.get();
}
}

@Test
public void testWithoutIgnoreUnknownFieldsUpdateSecondSuccess() throws Exception {
TableSchema tableSchema = TableSchema.newBuilder().addFields(0, TEST_INT).build();
TableSchema updatedSchema =
TableSchema.newBuilder()
.addFields(0, TEST_INT)
.addFields(
1,
TableFieldSchema.newBuilder()
.setName("test_string")
.setType(TableFieldSchema.Type.STRING)
.setMode(Mode.NULLABLE))
.build();
// GetWriteStream is called twice and got the updated schema
testBigQueryWrite.addResponse(
WriteStream.newBuilder().setName(TEST_STREAM).setTableSchema(tableSchema).build());
testBigQueryWrite.addResponse(
WriteStream.newBuilder().setName(TEST_STREAM).setTableSchema(updatedSchema).build());
testBigQueryWrite.addResponse(
AppendRowsResponse.newBuilder()
.setAppendResult(
AppendRowsResponse.AppendResult.newBuilder().setOffset(Int64Value.of(0)).build())
.build());
try (JsonStreamWriter writer =
getTestJsonStreamWriterBuilder(TEST_STREAM, tableSchema).build()) {
JSONObject foo = new JSONObject();
foo.put("test_int", 10);
JSONObject bar = new JSONObject();
bar.put("test_string", "a");
JSONArray jsonArr = new JSONArray();
jsonArr.put(foo);
jsonArr.put(bar);
ApiFuture<AppendRowsResponse> appendFuture = writer.append(jsonArr);
appendFuture.get();
}
}

@Test
public void testWithoutIgnoreUnknownFieldsUpdateFail() throws Exception {
TableSchema tableSchema = TableSchema.newBuilder().addFields(0, TEST_INT).build();
// GetWriteStream is called once but failed to update to the right schema.
testBigQueryWrite.addResponse(
WriteStream.newBuilder().setName(TEST_STREAM).setTableSchema(tableSchema).build());
testBigQueryWrite.addResponse(
WriteStream.newBuilder().setName(TEST_STREAM).setTableSchema(tableSchema).build());
try (JsonStreamWriter writer =
getTestJsonStreamWriterBuilder(TEST_STREAM, tableSchema).build()) {
JSONObject foo = new JSONObject();
Expand Down Expand Up @@ -626,6 +706,10 @@ public void testMultipleAppendSerializtionErrors()
jsonArr.put(foo);
jsonArr.put(foo1);
jsonArr.put(foo2);
testBigQueryWrite.addResponse(
WriteStream.newBuilder().setName(TEST_STREAM).setTableSchema(TABLE_SCHEMA).build());
testBigQueryWrite.addResponse(
WriteStream.newBuilder().setName(TEST_STREAM).setTableSchema(TABLE_SCHEMA).build());

try (JsonStreamWriter writer =
getTestJsonStreamWriterBuilder(TEST_STREAM, TABLE_SCHEMA).build()) {
Expand Down