Skip to content

Commit

Permalink
feat: expose row level serialization failures for JsonStreamWriter ap…
Browse files Browse the repository at this point in the history
…pend (#1686)

* Expose row level serialization failures for JsonStreamWriter append

* Expose row level serialization failures for JsonStreamWriter append

* Expose row level serialization failures for JsonStreamWriter append

* Expose row level serialization failures for JsonStreamWriter append

* Expose row level serialization failures for JsonStreamWriter append

* Expose row level serialization failures for JsonStreamWriter append

* 🦉 Updates from OwlBot post-processor

See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md

Co-authored-by: Owl Bot <gcf-owl-bot[bot]@users.noreply.github.com>
  • Loading branch information
noak2 and gcf-owl-bot[bot] authored Jul 7, 2022
1 parent 19decc3 commit bba0746
Show file tree
Hide file tree
Showing 3 changed files with 88 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
import io.grpc.protobuf.StatusProto;
import java.util.Map;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import javax.annotation.Nullable;
Expand Down Expand Up @@ -203,5 +204,29 @@ public static StorageException toStorageException(Throwable exception) {
return toStorageException(rpcStatus, exception);
}

/**
* This exception is thrown from {@link JsonStreamWriter#append()} when the client side Json to
* Proto serializtion fails. The exception contains a Map of indexes of faulty lines and the
* corresponding error message.
*/
public static class AppendSerializtionError extends RuntimeException {
private final Map<Integer, String> rowIndexToErrorMessage;
private final String streamName;

public AppendSerializtionError(String streamName, Map<Integer, String> rowIndexToErrorMessage) {
super(String.format("Append serializtion failed for writer: %s", streamName));
this.rowIndexToErrorMessage = rowIndexToErrorMessage;
this.streamName = streamName;
}

public Map<Integer, String> getRowIndexToErrorMessage() {
return rowIndexToErrorMessage;
}

public String getStreamName() {
return streamName;
}
}

private Exceptions() {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,15 @@
import com.google.api.gax.batching.FlowControlSettings;
import com.google.api.gax.core.CredentialsProvider;
import com.google.api.gax.rpc.TransportChannelProvider;
import com.google.cloud.bigquery.storage.v1.Exceptions.AppendSerializtionError;
import com.google.common.base.Preconditions;
import com.google.protobuf.Descriptors;
import com.google.protobuf.Descriptors.Descriptor;
import com.google.protobuf.Descriptors.DescriptorValidationException;
import com.google.protobuf.Message;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.logging.Logger;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
Expand Down Expand Up @@ -137,17 +140,27 @@ public ApiFuture<AppendRowsResponse> append(JSONArray jsonArr, long offset)

ProtoRows.Builder rowsBuilder = ProtoRows.newBuilder();
// Any error in convertJsonToProtoMessage will throw an
// IllegalArgumentException/IllegalStateException/NullPointerException and will halt
// processing
// of JSON data.
// IllegalArgumentException/IllegalStateException/NullPointerException.
// IllegalArgumentException will be collected into a Map of row indexes to error messages.
// After the conversion is finished an AppendSerializtionError exception that contains all the
// conversion errors will be thrown.
long currentRequestSize = 0;
Map<Integer, String> rowIndexToErrorMessage = new HashMap<>();
for (int i = 0; i < jsonArr.length(); i++) {
JSONObject json = jsonArr.getJSONObject(i);
Message protoMessage =
JsonToProtoMessage.convertJsonToProtoMessage(
this.descriptor, this.tableSchema, json, ignoreUnknownFields);
rowsBuilder.addSerializedRows(protoMessage.toByteString());
currentRequestSize += protoMessage.getSerializedSize();
try {
Message protoMessage =
JsonToProtoMessage.convertJsonToProtoMessage(
this.descriptor, this.tableSchema, json, ignoreUnknownFields);
rowsBuilder.addSerializedRows(protoMessage.toByteString());
currentRequestSize += protoMessage.getSerializedSize();
} catch (IllegalArgumentException exception) {
rowIndexToErrorMessage.put(i, exception.getMessage());
}
}

if (!rowIndexToErrorMessage.isEmpty()) {
throw new AppendSerializtionError(streamName, rowIndexToErrorMessage);
}
final ApiFuture<AppendRowsResponse> appendResponseFuture =
this.streamWriter.append(rowsBuilder.build(), offset);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,15 @@
import com.google.cloud.bigquery.storage.test.JsonTest;
import com.google.cloud.bigquery.storage.test.Test.FooType;
import com.google.cloud.bigquery.storage.test.Test.UpdatedFooType;
import com.google.cloud.bigquery.storage.v1.Exceptions.AppendSerializtionError;
import com.google.protobuf.Descriptors.DescriptorValidationException;
import com.google.protobuf.Int64Value;
import com.google.protobuf.Timestamp;
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
import java.io.IOException;
import java.util.Arrays;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.logging.Logger;
Expand Down Expand Up @@ -518,9 +520,11 @@ public void testWithoutIgnoreUnknownFields() throws Exception {
try {
ApiFuture<AppendRowsResponse> appendFuture = writer.append(jsonArr);
Assert.fail("expected ExecutionException");
} catch (Exception ex) {
} catch (AppendSerializtionError ex) {
assertEquals(
ex.getMessage(), "JSONObject has fields unknown to BigQuery: root.test_unknown.");
"JSONObject has fields unknown to BigQuery: root.test_unknown.",
ex.getRowIndexToErrorMessage().get(1));
assertEquals(TEST_STREAM, ex.getStreamName());
}
}
}
Expand Down Expand Up @@ -603,4 +607,40 @@ public void testFlowControlSettingNoLimitBehavior() throws Exception {
appendFuture.get();
}
}

@Test
public void testMultipleAppendSerializtionErrors()
throws DescriptorValidationException, IOException, InterruptedException {
FooType expectedProto = FooType.newBuilder().setFoo("allen").build();
JSONObject foo = new JSONObject();
// put a field which is not part of the expected schema
foo.put("not_foo", "allen");
JSONObject foo1 = new JSONObject();
// put a vaild value into the field
foo1.put("foo", "allen");
JSONObject foo2 = new JSONObject();
// put a number into a string field
foo2.put("foo", 666);
JSONArray jsonArr = new JSONArray();
jsonArr.put(foo);
jsonArr.put(foo1);
jsonArr.put(foo2);

try (JsonStreamWriter writer =
getTestJsonStreamWriterBuilder(TEST_STREAM, TABLE_SCHEMA).build()) {
try {
ApiFuture<AppendRowsResponse> appendFuture = writer.append(jsonArr);
Assert.fail("expected AppendSerializtionError");
} catch (AppendSerializtionError appendSerializtionError) {
Map<Integer, String> rowIndexToErrorMessage =
appendSerializtionError.getRowIndexToErrorMessage();
assertEquals(2, rowIndexToErrorMessage.size());
assertEquals(
"JSONObject has fields unknown to BigQuery: root.not_foo.",
rowIndexToErrorMessage.get(0));
assertEquals(
"JSONObject does not have a string field at root.foo.", rowIndexToErrorMessage.get(2));
}
}
}
}

0 comments on commit bba0746

Please sign in to comment.