From bba0746a13c785621c4e4cbd2239060d67ce155b Mon Sep 17 00:00:00 2001 From: noak2 <107106049+noak2@users.noreply.github.com> Date: Thu, 7 Jul 2022 22:56:37 +0300 Subject: [PATCH] feat: expose row level serialization failures for JsonStreamWriter append (#1686) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * Expose row level serialization failures for JsonStreamWriter append * Expose row level serialization failures for JsonStreamWriter append * Expose row level serialization failures for JsonStreamWriter append * Expose row level serialization failures for JsonStreamWriter append * Expose row level serialization failures for JsonStreamWriter append * Expose row level serialization failures for JsonStreamWriter append * 🦉 Updates from OwlBot post-processor See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md Co-authored-by: Owl Bot --- .../cloud/bigquery/storage/v1/Exceptions.java | 25 +++++++++++ .../bigquery/storage/v1/JsonStreamWriter.java | 29 ++++++++---- .../storage/v1/JsonStreamWriterTest.java | 44 ++++++++++++++++++- 3 files changed, 88 insertions(+), 10 deletions(-) diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/Exceptions.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/Exceptions.java index 5b02271a58..ba1051cd67 100644 --- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/Exceptions.java +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/Exceptions.java @@ -22,6 +22,7 @@ import io.grpc.Status; import io.grpc.StatusRuntimeException; import io.grpc.protobuf.StatusProto; +import java.util.Map; import java.util.regex.Matcher; import java.util.regex.Pattern; import javax.annotation.Nullable; @@ -203,5 +204,29 @@ public static StorageException toStorageException(Throwable exception) { return toStorageException(rpcStatus, exception); } + /** + * This exception is thrown from {@link JsonStreamWriter#append()} when the client side Json to + * Proto serializtion fails. The exception contains a Map of indexes of faulty lines and the + * corresponding error message. + */ + public static class AppendSerializtionError extends RuntimeException { + private final Map rowIndexToErrorMessage; + private final String streamName; + + public AppendSerializtionError(String streamName, Map rowIndexToErrorMessage) { + super(String.format("Append serializtion failed for writer: %s", streamName)); + this.rowIndexToErrorMessage = rowIndexToErrorMessage; + this.streamName = streamName; + } + + public Map getRowIndexToErrorMessage() { + return rowIndexToErrorMessage; + } + + public String getStreamName() { + return streamName; + } + } + private Exceptions() {} } diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/JsonStreamWriter.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/JsonStreamWriter.java index 3186c282f4..a7d2a0a589 100644 --- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/JsonStreamWriter.java +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/JsonStreamWriter.java @@ -19,12 +19,15 @@ import com.google.api.gax.batching.FlowControlSettings; import com.google.api.gax.core.CredentialsProvider; import com.google.api.gax.rpc.TransportChannelProvider; +import com.google.cloud.bigquery.storage.v1.Exceptions.AppendSerializtionError; import com.google.common.base.Preconditions; import com.google.protobuf.Descriptors; import com.google.protobuf.Descriptors.Descriptor; import com.google.protobuf.Descriptors.DescriptorValidationException; import com.google.protobuf.Message; import java.io.IOException; +import java.util.HashMap; +import java.util.Map; import java.util.logging.Logger; import java.util.regex.Matcher; import java.util.regex.Pattern; @@ -137,17 +140,27 @@ public ApiFuture append(JSONArray jsonArr, long offset) ProtoRows.Builder rowsBuilder = ProtoRows.newBuilder(); // Any error in convertJsonToProtoMessage will throw an - // IllegalArgumentException/IllegalStateException/NullPointerException and will halt - // processing - // of JSON data. + // IllegalArgumentException/IllegalStateException/NullPointerException. + // IllegalArgumentException will be collected into a Map of row indexes to error messages. + // After the conversion is finished an AppendSerializtionError exception that contains all the + // conversion errors will be thrown. long currentRequestSize = 0; + Map rowIndexToErrorMessage = new HashMap<>(); for (int i = 0; i < jsonArr.length(); i++) { JSONObject json = jsonArr.getJSONObject(i); - Message protoMessage = - JsonToProtoMessage.convertJsonToProtoMessage( - this.descriptor, this.tableSchema, json, ignoreUnknownFields); - rowsBuilder.addSerializedRows(protoMessage.toByteString()); - currentRequestSize += protoMessage.getSerializedSize(); + try { + Message protoMessage = + JsonToProtoMessage.convertJsonToProtoMessage( + this.descriptor, this.tableSchema, json, ignoreUnknownFields); + rowsBuilder.addSerializedRows(protoMessage.toByteString()); + currentRequestSize += protoMessage.getSerializedSize(); + } catch (IllegalArgumentException exception) { + rowIndexToErrorMessage.put(i, exception.getMessage()); + } + } + + if (!rowIndexToErrorMessage.isEmpty()) { + throw new AppendSerializtionError(streamName, rowIndexToErrorMessage); } final ApiFuture appendResponseFuture = this.streamWriter.append(rowsBuilder.build(), offset); diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/JsonStreamWriterTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/JsonStreamWriterTest.java index 4a19f17e24..434659dee7 100644 --- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/JsonStreamWriterTest.java +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/JsonStreamWriterTest.java @@ -31,6 +31,7 @@ import com.google.cloud.bigquery.storage.test.JsonTest; import com.google.cloud.bigquery.storage.test.Test.FooType; import com.google.cloud.bigquery.storage.test.Test.UpdatedFooType; +import com.google.cloud.bigquery.storage.v1.Exceptions.AppendSerializtionError; import com.google.protobuf.Descriptors.DescriptorValidationException; import com.google.protobuf.Int64Value; import com.google.protobuf.Timestamp; @@ -38,6 +39,7 @@ import io.grpc.StatusRuntimeException; import java.io.IOException; import java.util.Arrays; +import java.util.Map; import java.util.UUID; import java.util.concurrent.ExecutionException; import java.util.logging.Logger; @@ -518,9 +520,11 @@ public void testWithoutIgnoreUnknownFields() throws Exception { try { ApiFuture appendFuture = writer.append(jsonArr); Assert.fail("expected ExecutionException"); - } catch (Exception ex) { + } catch (AppendSerializtionError ex) { assertEquals( - ex.getMessage(), "JSONObject has fields unknown to BigQuery: root.test_unknown."); + "JSONObject has fields unknown to BigQuery: root.test_unknown.", + ex.getRowIndexToErrorMessage().get(1)); + assertEquals(TEST_STREAM, ex.getStreamName()); } } } @@ -603,4 +607,40 @@ public void testFlowControlSettingNoLimitBehavior() throws Exception { appendFuture.get(); } } + + @Test + public void testMultipleAppendSerializtionErrors() + throws DescriptorValidationException, IOException, InterruptedException { + FooType expectedProto = FooType.newBuilder().setFoo("allen").build(); + JSONObject foo = new JSONObject(); + // put a field which is not part of the expected schema + foo.put("not_foo", "allen"); + JSONObject foo1 = new JSONObject(); + // put a vaild value into the field + foo1.put("foo", "allen"); + JSONObject foo2 = new JSONObject(); + // put a number into a string field + foo2.put("foo", 666); + JSONArray jsonArr = new JSONArray(); + jsonArr.put(foo); + jsonArr.put(foo1); + jsonArr.put(foo2); + + try (JsonStreamWriter writer = + getTestJsonStreamWriterBuilder(TEST_STREAM, TABLE_SCHEMA).build()) { + try { + ApiFuture appendFuture = writer.append(jsonArr); + Assert.fail("expected AppendSerializtionError"); + } catch (AppendSerializtionError appendSerializtionError) { + Map rowIndexToErrorMessage = + appendSerializtionError.getRowIndexToErrorMessage(); + assertEquals(2, rowIndexToErrorMessage.size()); + assertEquals( + "JSONObject has fields unknown to BigQuery: root.not_foo.", + rowIndexToErrorMessage.get(0)); + assertEquals( + "JSONObject does not have a string field at root.foo.", rowIndexToErrorMessage.get(2)); + } + } + } }