Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: expose row level serialization failures for JsonStreamWriter append #1686

Merged
merged 7 commits into from
Jul 7, 2022
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
import io.grpc.protobuf.StatusProto;
import java.util.List;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import javax.annotation.Nullable;
Expand Down Expand Up @@ -203,5 +204,31 @@ public static StorageException toStorageException(Throwable exception) {
return toStorageException(rpcStatus, exception);
}

/**
* This exception is thrown from {@link JsonStreamWriter#append()} when the client side Json to
* Proto serializtion fails. The exception contains a list of {@link RowError} object which
* represent all of the faulty lines. {@link RowError} can be also returned as part of {@link
* AppendRowsResponse}, after the rows were processed on the server side. using the same error
* object should make error hadling easier.
*/
public static class AppendSerializtionError extends RuntimeException {
private final List<RowError> rowErrors;
private final String streamName;

public AppendSerializtionError(@Nullable String streamName, List<RowError> rowErrors) {
super(String.format("Append serializtion failed for writer: %s", streamName));
this.rowErrors = rowErrors;
this.streamName = streamName;
}

public List<RowError> getRowErrors() {
return rowErrors;
}

public String getStreamName() {
return streamName;
}
}

private Exceptions() {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,16 @@
import com.google.api.gax.batching.FlowControlSettings;
import com.google.api.gax.core.CredentialsProvider;
import com.google.api.gax.rpc.TransportChannelProvider;
import com.google.cloud.bigquery.storage.v1.Exceptions.AppendSerializtionError;
import com.google.cloud.bigquery.storage.v1.RowError.RowErrorCode;
import com.google.common.base.Preconditions;
import com.google.protobuf.Descriptors;
import com.google.protobuf.Descriptors.Descriptor;
import com.google.protobuf.Descriptors.DescriptorValidationException;
import com.google.protobuf.Message;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.logging.Logger;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
Expand Down Expand Up @@ -65,7 +69,7 @@ public class JsonStreamWriter implements AutoCloseable {
*/
private JsonStreamWriter(Builder builder)
throws Descriptors.DescriptorValidationException, IllegalArgumentException, IOException,
InterruptedException {
InterruptedException {
this.client = builder.client;
this.descriptor =
BQTableSchemaToProtoDescriptor.convertBQTableSchemaToProtoDescriptor(builder.tableSchema);
Expand Down Expand Up @@ -137,17 +141,33 @@ public ApiFuture<AppendRowsResponse> append(JSONArray jsonArr, long offset)

ProtoRows.Builder rowsBuilder = ProtoRows.newBuilder();
// Any error in convertJsonToProtoMessage will throw an
// IllegalArgumentException/IllegalStateException/NullPointerException and will halt
// processing
// of JSON data.
// IllegalArgumentException/IllegalStateException/NullPointerException which will be collected
// into a list of RowErrors. After the coverstion is finished an AppendSerializtionError
// exception that contains all the conversion errors will be thrown.
long currentRequestSize = 0;
List<RowError> rowErrors = new ArrayList<>();
for (int i = 0; i < jsonArr.length(); i++) {
JSONObject json = jsonArr.getJSONObject(i);
Message protoMessage =
JsonToProtoMessage.convertJsonToProtoMessage(
this.descriptor, this.tableSchema, json, ignoreUnknownFields);
rowsBuilder.addSerializedRows(protoMessage.toByteString());
currentRequestSize += protoMessage.getSerializedSize();
try {
Message protoMessage =
JsonToProtoMessage.convertJsonToProtoMessage(
this.descriptor, this.tableSchema, json, ignoreUnknownFields);
rowsBuilder.addSerializedRows(protoMessage.toByteString());
currentRequestSize += protoMessage.getSerializedSize();
} catch (IllegalArgumentException
| IllegalStateException
| NullPointerException exception) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we want to catch all of these? IllegalState is ambiguous, but NullPointerException seems like a more serious error indicating something is wrong, rather than a "row-level" error

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In JsonToProtoMessage#convertJsonToProtoMessage there are some Null and state checks, some of them are not relevant to a single line and some are, for example Preconditions.checkState(json.length() != 0, "JSONObject is empty."); is relevant for a single row and should be reported as such.

I'm not sure how we can even get to a point from JsonStreamWriter that we will have null values in JsonObjects, so I will remove the catch for null and state exceptions for now, and add them later if we see it's needed

rowErrors.add(
RowError.newBuilder()
.setIndex(i)
.setCode(RowErrorCode.FIELDS_ERROR)
noak2 marked this conversation as resolved.
Show resolved Hide resolved
.setMessage(exception.getMessage())
.build());
}
}

if (!rowErrors.isEmpty()) {
throw new AppendSerializtionError(streamName, rowErrors);
}
final ApiFuture<AppendRowsResponse> appendResponseFuture =
this.streamWriter.append(rowsBuilder.build(), offset);
Expand Down Expand Up @@ -408,7 +428,7 @@ public Builder setReconnectAfter10M(boolean reconnectAfter10M) {
*/
public JsonStreamWriter build()
throws Descriptors.DescriptorValidationException, IllegalArgumentException, IOException,
InterruptedException {
InterruptedException {
return new JsonStreamWriter(this);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,16 @@
import com.google.cloud.bigquery.storage.test.JsonTest;
import com.google.cloud.bigquery.storage.test.Test.FooType;
import com.google.cloud.bigquery.storage.test.Test.UpdatedFooType;
import com.google.cloud.bigquery.storage.v1.Exceptions.AppendSerializtionError;
import com.google.cloud.bigquery.storage.v1.RowError.RowErrorCode;
import com.google.protobuf.Descriptors.DescriptorValidationException;
import com.google.protobuf.Int64Value;
import com.google.protobuf.Timestamp;
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.logging.Logger;
Expand Down Expand Up @@ -518,9 +521,10 @@ public void testWithoutIgnoreUnknownFields() throws Exception {
try {
ApiFuture<AppendRowsResponse> appendFuture = writer.append(jsonArr);
Assert.fail("expected ExecutionException");
} catch (Exception ex) {
} catch (AppendSerializtionError ex) {
noak2 marked this conversation as resolved.
Show resolved Hide resolved
assertEquals(
ex.getMessage(), "JSONObject has fields unknown to BigQuery: root.test_unknown.");
ex.getRowErrors().get(0).getMessage(),
"JSONObject has fields unknown to BigQuery: root.test_unknown.");
}
}
}
Expand Down Expand Up @@ -603,4 +607,45 @@ public void testFlowControlSettingNoLimitBehavior() throws Exception {
appendFuture.get();
}
}

@Test
public void testMultipleAppendSerializtionErrors()
noak2 marked this conversation as resolved.
Show resolved Hide resolved
throws DescriptorValidationException, IOException, InterruptedException {
FooType expectedProto = FooType.newBuilder().setFoo("allen").build();
JSONObject foo = new JSONObject();
noak2 marked this conversation as resolved.
Show resolved Hide resolved
foo.put("not_foo", "allen");
JSONObject foo1 = new JSONObject();
foo1.put("foo", "allen");
JSONObject foo2 = new JSONObject();
foo2.put("foo", 666);
JSONArray jsonArr = new JSONArray();
jsonArr.put(foo);
jsonArr.put(foo1);
jsonArr.put(foo2);
RowError columnMismatchError =
RowError.newBuilder()
.setCode(RowErrorCode.FIELDS_ERROR)
.setIndex(0)
.setMessage("JSONObject has fields unknown to BigQuery: root.not_foo.")
.build();
RowError typeError =
RowError.newBuilder()
.setCode(RowErrorCode.FIELDS_ERROR)
.setIndex(2)
.setMessage("JSONObject does not have a string field at root.foo.")
.build();

try (JsonStreamWriter writer =
getTestJsonStreamWriterBuilder(TEST_STREAM, TABLE_SCHEMA).build()) {
try {
ApiFuture<AppendRowsResponse> appendFuture = writer.append(jsonArr);
Assert.fail("expected AppendSerializtionError");
} catch (AppendSerializtionError appendSerializtionError) {
List<RowError> rowErrors = appendSerializtionError.getRowErrors();
assertEquals(2, rowErrors.size());
assertEquals(columnMismatchError, rowErrors.get(0));
assertEquals(typeError, rowErrors.get(1));
}
}
}
}