Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: expose row level serialization failures for JsonStreamWriter append #1686

Merged
merged 7 commits into from
Jul 7, 2022
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
import io.grpc.protobuf.StatusProto;
import java.util.Map;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import javax.annotation.Nullable;
Expand Down Expand Up @@ -203,5 +204,29 @@ public static StorageException toStorageException(Throwable exception) {
return toStorageException(rpcStatus, exception);
}

/**
* This exception is thrown from {@link JsonStreamWriter#append()} when the client side Json to
* Proto serializtion fails. The exception contains a Map of indexes of faulty lines and the
* corresponding error message.
*/
public static class AppendSerializtionError extends RuntimeException {
private final Map<Integer, String> rowIndexToErrorMessage;
private final String streamName;

public AppendSerializtionError(String streamName, Map<Integer, String> rowIndexToErrorMessage) {
super(String.format("Append serializtion failed for writer: %s", streamName));
this.rowIndexToErrorMessage = rowIndexToErrorMessage;
this.streamName = streamName;
}

public Map<Integer, String> getRowIndexToErrorMessage() {
return rowIndexToErrorMessage;
}

public String getStreamName() {
return streamName;
}
}

private Exceptions() {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,15 @@
import com.google.api.gax.batching.FlowControlSettings;
import com.google.api.gax.core.CredentialsProvider;
import com.google.api.gax.rpc.TransportChannelProvider;
import com.google.cloud.bigquery.storage.v1.Exceptions.AppendSerializtionError;
import com.google.common.base.Preconditions;
import com.google.protobuf.Descriptors;
import com.google.protobuf.Descriptors.Descriptor;
import com.google.protobuf.Descriptors.DescriptorValidationException;
import com.google.protobuf.Message;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.logging.Logger;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
Expand Down Expand Up @@ -65,7 +68,7 @@ public class JsonStreamWriter implements AutoCloseable {
*/
private JsonStreamWriter(Builder builder)
throws Descriptors.DescriptorValidationException, IllegalArgumentException, IOException,
InterruptedException {
InterruptedException {
this.client = builder.client;
this.descriptor =
BQTableSchemaToProtoDescriptor.convertBQTableSchemaToProtoDescriptor(builder.tableSchema);
Expand Down Expand Up @@ -137,17 +140,27 @@ public ApiFuture<AppendRowsResponse> append(JSONArray jsonArr, long offset)

ProtoRows.Builder rowsBuilder = ProtoRows.newBuilder();
// Any error in convertJsonToProtoMessage will throw an
// IllegalArgumentException/IllegalStateException/NullPointerException and will halt
// processing
// of JSON data.
// IllegalArgumentException/IllegalStateException/NullPointerException.
// IllegalArgumentException will be collected into a Map of row indexes to error messages.
// After the conversion is finished an AppendSerializtionError exception that contains all the
// conversion errors will be thrown.
long currentRequestSize = 0;
Map<Integer, String> rowIndexToErrorMessage = new HashMap<>();
for (int i = 0; i < jsonArr.length(); i++) {
JSONObject json = jsonArr.getJSONObject(i);
Message protoMessage =
JsonToProtoMessage.convertJsonToProtoMessage(
this.descriptor, this.tableSchema, json, ignoreUnknownFields);
rowsBuilder.addSerializedRows(protoMessage.toByteString());
currentRequestSize += protoMessage.getSerializedSize();
try {
Message protoMessage =
JsonToProtoMessage.convertJsonToProtoMessage(
this.descriptor, this.tableSchema, json, ignoreUnknownFields);
rowsBuilder.addSerializedRows(protoMessage.toByteString());
currentRequestSize += protoMessage.getSerializedSize();
} catch (IllegalArgumentException exception) {
rowIndexToErrorMessage.put(i, exception.getMessage());
}
}

if (!rowIndexToErrorMessage.isEmpty()) {
throw new AppendSerializtionError(streamName, rowIndexToErrorMessage);
}
final ApiFuture<AppendRowsResponse> appendResponseFuture =
this.streamWriter.append(rowsBuilder.build(), offset);
Expand Down Expand Up @@ -408,7 +421,7 @@ public Builder setReconnectAfter10M(boolean reconnectAfter10M) {
*/
public JsonStreamWriter build()
throws Descriptors.DescriptorValidationException, IllegalArgumentException, IOException,
InterruptedException {
InterruptedException {
return new JsonStreamWriter(this);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,15 @@
import com.google.cloud.bigquery.storage.test.JsonTest;
import com.google.cloud.bigquery.storage.test.Test.FooType;
import com.google.cloud.bigquery.storage.test.Test.UpdatedFooType;
import com.google.cloud.bigquery.storage.v1.Exceptions.AppendSerializtionError;
import com.google.protobuf.Descriptors.DescriptorValidationException;
import com.google.protobuf.Int64Value;
import com.google.protobuf.Timestamp;
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
import java.io.IOException;
import java.util.Arrays;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.logging.Logger;
Expand Down Expand Up @@ -518,9 +520,11 @@ public void testWithoutIgnoreUnknownFields() throws Exception {
try {
ApiFuture<AppendRowsResponse> appendFuture = writer.append(jsonArr);
Assert.fail("expected ExecutionException");
} catch (Exception ex) {
} catch (AppendSerializtionError ex) {
noak2 marked this conversation as resolved.
Show resolved Hide resolved
assertEquals(
ex.getMessage(), "JSONObject has fields unknown to BigQuery: root.test_unknown.");
"JSONObject has fields unknown to BigQuery: root.test_unknown.",
ex.getRowIndexToErrorMessage().get(1));
assertEquals(TEST_STREAM, ex.getStreamName());
}
}
}
Expand Down Expand Up @@ -603,4 +607,40 @@ public void testFlowControlSettingNoLimitBehavior() throws Exception {
appendFuture.get();
}
}

@Test
public void testMultipleAppendSerializtionErrors()
noak2 marked this conversation as resolved.
Show resolved Hide resolved
throws DescriptorValidationException, IOException, InterruptedException {
FooType expectedProto = FooType.newBuilder().setFoo("allen").build();
JSONObject foo = new JSONObject();
noak2 marked this conversation as resolved.
Show resolved Hide resolved
// put a field which is not part of the expected schema
foo.put("not_foo", "allen");
JSONObject foo1 = new JSONObject();
// put a vaild value into the field
foo1.put("foo", "allen");
JSONObject foo2 = new JSONObject();
// put a number into a string field
foo2.put("foo", 666);
JSONArray jsonArr = new JSONArray();
jsonArr.put(foo);
jsonArr.put(foo1);
jsonArr.put(foo2);

try (JsonStreamWriter writer =
getTestJsonStreamWriterBuilder(TEST_STREAM, TABLE_SCHEMA).build()) {
try {
ApiFuture<AppendRowsResponse> appendFuture = writer.append(jsonArr);
Assert.fail("expected AppendSerializtionError");
} catch (AppendSerializtionError appendSerializtionError) {
Map<Integer, String> rowIndexToErrorMessage =
appendSerializtionError.getRowIndexToErrorMessage();
assertEquals(2, rowIndexToErrorMessage.size());
assertEquals(
"JSONObject has fields unknown to BigQuery: root.not_foo.",
rowIndexToErrorMessage.get(0));
assertEquals(
"JSONObject does not have a string field at root.foo.", rowIndexToErrorMessage.get(2));
}
}
}
}