Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add support for flexible column name in JsonStreamWriter #1786

Merged
merged 12 commits into from
Sep 28, 2022
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import com.google.common.collect.ImmutableMap;
import com.google.protobuf.DescriptorProtos.DescriptorProto;
import com.google.protobuf.DescriptorProtos.FieldDescriptorProto;
import com.google.protobuf.DescriptorProtos.FieldOptions;
import com.google.protobuf.DescriptorProtos.FileDescriptorProto;
import com.google.protobuf.Descriptors;
import com.google.protobuf.Descriptors.Descriptor;
Expand Down Expand Up @@ -92,7 +93,11 @@ private static Descriptor convertBQTableSchemaToProtoDescriptorImpl(
List<FieldDescriptorProto> fields = new ArrayList<FieldDescriptorProto>();
int index = 1;
for (TableFieldSchema BQTableField : BQTableSchema.getFieldsList()) {
String currentScope = scope + "__" + BQTableField.getName();
String scopeName =
BigQuerySchemaUtil.isProtoCompatible(BQTableField.getName())
? BQTableField.getName()
: BigQuerySchemaUtil.generatePlaceholderFieldName(BQTableField.getName());
String currentScope = scope + "__" + scopeName;
if (BQTableField.getType() == TableFieldSchema.Type.STRUCT) {
ImmutableList<TableFieldSchema> fieldList =
ImmutableList.copyOf(BQTableField.getFieldsList());
Expand Down Expand Up @@ -137,19 +142,26 @@ private static FieldDescriptorProto convertBQTableFieldToProtoField(
TableFieldSchema BQTableField, int index, String scope) {
TableFieldSchema.Mode mode = BQTableField.getMode();
String fieldName = BQTableField.getName().toLowerCase();

FieldDescriptorProto.Builder fieldDescriptor =
FieldDescriptorProto.newBuilder()
.setName(fieldName)
.setNumber(index)
.setLabel((FieldDescriptorProto.Label) BQTableSchemaModeMap.get(mode));

if (BQTableField.getType() == TableFieldSchema.Type.STRUCT) {
return FieldDescriptorProto.newBuilder()
.setName(fieldName)
.setTypeName(scope)
.setLabel((FieldDescriptorProto.Label) BQTableSchemaModeMap.get(mode))
.setNumber(index)
.build();
fieldDescriptor.setTypeName(scope);
} else {
fieldDescriptor.setType(
(FieldDescriptorProto.Type) BQTableSchemaTypeMap.get(BQTableField.getType()));
}

// Sets columnName annotation when field name is not proto comptaible.
if (!BigQuerySchemaUtil.isProtoCompatible(fieldName)) {
fieldDescriptor.setName(BigQuerySchemaUtil.generatePlaceholderFieldName(fieldName));
fieldDescriptor.setOptions(
FieldOptions.newBuilder().setExtension(AnnotationsProto.columnName, fieldName).build());
}
return FieldDescriptorProto.newBuilder()
.setName(fieldName)
.setType((FieldDescriptorProto.Type) BQTableSchemaTypeMap.get(BQTableField.getType()))
.setLabel((FieldDescriptorProto.Label) BQTableSchemaModeMap.get(mode))
.setNumber(index)
.build();
return fieldDescriptor.build();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/*
* Copyright 2020 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.google.cloud.bigquery.storage.v1;

import com.google.protobuf.Descriptors.FieldDescriptor;
import java.util.Base64;
import java.util.regex.Pattern;

public class BigQuerySchemaUtil {

private static final String PROTO_COMPATIBLE_NAME_REGEXP = "[A-Za-z_][A-Za-z0-9_]*";
private static final String PLACEHOLDER_FILED_NAME_PREFIX = "col_";
private static final Pattern PROTO_COMPATIBLE_NAME_PATTERN =
Pattern.compile(PROTO_COMPATIBLE_NAME_REGEXP);

/**
* * Checks if the field name is compatible with proto field naming convention.
*
* @param fieldName name for the field
* @return true if the field name is comptaible with proto naming convention, otherwise, returns
* false.
*/
public static boolean isProtoCompatible(String fieldName) {
return PROTO_COMPATIBLE_NAME_PATTERN.matcher(fieldName).matches();
}

/**
* * Generates a placeholder name that consists of a prefix + base64 encoded field name. We
* replace all dashes with underscores as they are not allowed for proto field names.
*
* @param fieldName name for the field
* @return the generated placeholder field name
*/
public static String generatePlaceholderFieldName(String fieldName) {
return PLACEHOLDER_FILED_NAME_PREFIX
+ Base64.getUrlEncoder()
.withoutPadding()
.encodeToString(fieldName.getBytes())
.replace('-', '_');
}

/**
* * Gets the user-facing field name from the descriptor
*
* @param fieldDescriptor
* @return columnName annotation if present, otherwise return the field name.
*/
public static String getFieldName(FieldDescriptor fieldDescriptor) {
return fieldDescriptor.getOptions().hasExtension(AnnotationsProto.columnName)
? fieldDescriptor.getOptions().getExtension(AnnotationsProto.columnName)
: fieldDescriptor.getName();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -197,9 +197,15 @@ private static DynamicMessage convertJsonToProtoMessageImpl(
String jsonName = jsonNames[i];
// We want lowercase here to support case-insensitive data writes.
// The protobuf descriptor that is used is assumed to have all lowercased fields
String jsonLowercaseName = jsonName.toLowerCase();
String jsonFieldLocator = jsonName.toLowerCase();

// If jsonName is not compatible with proto naming convention, we should look by its
// placeholder name.
if (!BigQuerySchemaUtil.isProtoCompatible(jsonFieldLocator)) {
jsonFieldLocator = BigQuerySchemaUtil.generatePlaceholderFieldName(jsonFieldLocator);
}
String currentScope = jsonScope + "." + jsonName;
FieldDescriptor field = protoSchema.findFieldByName(jsonLowercaseName);
FieldDescriptor field = protoSchema.findFieldByName(jsonFieldLocator);
if (field == null && !ignoreUnknownFields) {
throw new IllegalArgumentException(
String.format("JSONObject has fields unknown to BigQuery: %s.", currentScope));
Expand All @@ -210,7 +216,7 @@ private static DynamicMessage convertJsonToProtoMessageImpl(
if (tableSchema != null) {
// protoSchema is generated from tableSchema so their field ordering should match.
fieldSchema = tableSchema.get(field.getIndex());
if (!fieldSchema.getName().toLowerCase().equals(field.getName())) {
if (!fieldSchema.getName().toLowerCase().equals(BigQuerySchemaUtil.getFieldName(field))) {
throw new ValidationException(
"Field at index "
+ field.getIndex()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ public void testSimpleTypes() throws Exception {

@Test
public void testStructSimple() throws Exception {
final TableFieldSchema StringType =
final TableFieldSchema stringType =
TableFieldSchema.newBuilder()
.setType(TableFieldSchema.Type.STRING)
.setMode(TableFieldSchema.Mode.NULLABLE)
Expand All @@ -118,7 +118,7 @@ public void testStructSimple() throws Exception {
.setType(TableFieldSchema.Type.STRUCT)
.setMode(TableFieldSchema.Mode.NULLABLE)
.setName("test_field_type")
.addFields(0, StringType)
.addFields(0, stringType)
.build();
final TableSchema tableSchema = TableSchema.newBuilder().addFields(0, tableFieldSchema).build();
final Descriptor descriptor =
Expand Down Expand Up @@ -509,4 +509,32 @@ public void testDescriptorReuseDuringCreation() throws Exception {
assertEquals(descriptorToCount.get("root__reuse_lvl1__reuse_lvl2").intValue(), 3);
isDescriptorEqual(descriptor, ReuseRoot.getDescriptor());
}

@Test
public void testNestedFlexibleFieldName() throws Exception {
final TableFieldSchema stringField =
TableFieldSchema.newBuilder()
.setType(TableFieldSchema.Type.STRING)
.setMode(TableFieldSchema.Mode.NULLABLE)
.setName("str-列")
.build();
final TableFieldSchema intField =
TableFieldSchema.newBuilder()
.setType(TableFieldSchema.Type.INT64)
.setMode(TableFieldSchema.Mode.NULLABLE)
.setName("int-列")
.build();
final TableFieldSchema nestedField =
TableFieldSchema.newBuilder()
.setType(TableFieldSchema.Type.STRUCT)
.setMode(TableFieldSchema.Mode.NULLABLE)
.setName("nested-列")
.addFields(0, intField)
.build();
final TableSchema tableSchema =
TableSchema.newBuilder().addFields(0, stringField).addFields(1, nestedField).build();
final Descriptor descriptor =
BQTableSchemaToProtoDescriptor.convertBQTableSchemaToProtoDescriptor(tableSchema);
isDescriptorEqual(descriptor, TestNestedFlexibleFieldName.getDescriptor());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/*
* Copyright 2020 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.google.cloud.bigquery.storage.v1;

import com.google.cloud.bigquery.storage.test.SchemaTest.TestNestedFlexibleFieldName;
import com.google.protobuf.Descriptors.Descriptor;
import java.util.Arrays;
import java.util.List;
import junit.framework.TestCase;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

@RunWith(JUnit4.class)
public class BigQuerySchemaUtilTest extends TestCase {

@Test
public void testIsProtoCompatible() {
List<String> protoCompatibleNames = Arrays.asList("col_1", "name", "_0_");
List<String> protoIncompatibleNames = Arrays.asList("0_col", "()", "列");
tracyz-g marked this conversation as resolved.
Show resolved Hide resolved
protoCompatibleNames.stream()
.forEach(
name -> {
assertTrue(BigQuerySchemaUtil.isProtoCompatible(name));
});
protoIncompatibleNames.stream()
.forEach(
name -> {
assertFalse(BigQuerySchemaUtil.isProtoCompatible(name));
});
}

public void testGeneratePlaceholderFieldName() {
assertEquals("col_c3RyLeWIlw", BigQuerySchemaUtil.generatePlaceholderFieldName("str-列"));
// Base64 url encodes "~/~/" to "fi9-Lw", we replaced - with _ to be proto compatible.
assertEquals("col_fi9_Lw", BigQuerySchemaUtil.generatePlaceholderFieldName("~/~/"));
}

public void testGetFieldName() {
// Test get name from annotations.
Descriptor flexibleDescriptor = TestNestedFlexibleFieldName.getDescriptor();
assertEquals("str-列", BigQuerySchemaUtil.getFieldName(flexibleDescriptor.getFields().get(0)));
assertEquals(
"nested-列", BigQuerySchemaUtil.getFieldName(flexibleDescriptor.getFields().get(1)));

// Test get name without annotations.
Descriptor descriptor = TestNestedFlexibleFieldName.getDescriptor();
assertEquals("int32_value", BigQuerySchemaUtil.getFieldName(descriptor.getFields().get(0)));
assertEquals("int64_value", BigQuerySchemaUtil.getFieldName(descriptor.getFields().get(1)));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import com.google.api.gax.grpc.testing.MockServiceHelper;
import com.google.cloud.bigquery.storage.test.JsonTest;
import com.google.cloud.bigquery.storage.test.SchemaTest;
import com.google.cloud.bigquery.storage.test.Test.FlexibleType;
import com.google.cloud.bigquery.storage.test.Test.FooType;
import com.google.cloud.bigquery.storage.test.Test.UpdatedFooType;
import com.google.cloud.bigquery.storage.v1.Exceptions.AppendSerializtionError;
Expand Down Expand Up @@ -208,6 +209,54 @@ public void testSingleAppendSimpleJson() throws Exception {
}
}

@Test
public void testFlexibleColumnAppend() throws Exception {
TableFieldSchema field =
TableFieldSchema.newBuilder()
.setType(TableFieldSchema.Type.STRING)
.setMode(TableFieldSchema.Mode.NULLABLE)
.setName("test-列")
.build();
TableSchema tableSchema = TableSchema.newBuilder().addFields(0, field).build();
FlexibleType expectedProto = FlexibleType.newBuilder().setColDGVzdC3LiJc("allen").build();
JSONObject flexible = new JSONObject();
flexible.put("test-列", "allen");
JSONArray jsonArr = new JSONArray();
jsonArr.put(flexible);

try (JsonStreamWriter writer =
getTestJsonStreamWriterBuilder(TEST_STREAM, tableSchema).setTraceId("test:empty").build()) {

testBigQueryWrite.addResponse(
AppendRowsResponse.newBuilder()
.setAppendResult(
AppendRowsResponse.AppendResult.newBuilder().setOffset(Int64Value.of(0)).build())
.build());

ApiFuture<AppendRowsResponse> appendFuture = writer.append(jsonArr);
assertEquals(0L, appendFuture.get().getAppendResult().getOffset().getValue());
appendFuture.get();
assertEquals(
1,
testBigQueryWrite
.getAppendRequests()
.get(0)
.getProtoRows()
.getRows()
.getSerializedRowsCount());
assertEquals(
testBigQueryWrite
.getAppendRequests()
.get(0)
.getProtoRows()
.getRows()
.getSerializedRows(0),
expectedProto.toByteString());
assertEquals(
testBigQueryWrite.getAppendRequests().get(0).getTraceId(), "JsonWriter_test:empty");
}
}

@Test
public void testSpecialTypeAppend() throws Exception {
TableFieldSchema field =
Expand Down
14 changes: 14 additions & 0 deletions google-cloud-bigquerystorage/src/test/proto/schemaTest.proto
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ syntax = "proto2";

package com.google.cloud.bigquery.storage.test;

import "google/cloud/bigquery/storage/v1/annotations.proto";

message SupportedTypes {
optional int32 int32_value = 1;
optional int64 int64_value = 2;
Expand Down Expand Up @@ -257,3 +259,15 @@ message AllowUnknownUnsupportedFields {
message FakeFooType {
optional int32 foo = 1;
}

message TestNestedFlexibleFieldName {
optional string col_c3RyLeWIlw = 1
[(.google.cloud.bigquery.storage.v1.column_name) = "str-列"];
optional FlexibleNameField col_bmVzdGVkLeWIlw = 2
[(.google.cloud.bigquery.storage.v1.column_name) = "nested-列"];
}

message FlexibleNameField {
optional int64 col_aW50LeWIlw = 1
[(.google.cloud.bigquery.storage.v1.column_name) = "int-列"];
}
7 changes: 7 additions & 0 deletions google-cloud-bigquerystorage/src/test/proto/test.proto
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ syntax = "proto2";

package com.google.cloud.bigquery.storage.test;

import "google/cloud/bigquery/storage/v1/annotations.proto";

enum TestEnum {
TestEnum0 = 0;
TestEnum1 = 1;
Expand Down Expand Up @@ -80,3 +82,8 @@ message DuplicateType {
optional ComplicateType f3 = 3;
optional ComplicateType f4 = 4;
}

message FlexibleType {
optional string col_dGVzdC3liJc = 1
[(.google.cloud.bigquery.storage.v1.column_name) = "test-列"];
}