Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Coral-Schema] Fix incorrect type derivation for repeated field reference on UDF calls #510

Merged
merged 3 commits into from
Jul 31, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/**
* Copyright 2017-2023 LinkedIn Corporation. All rights reserved.
* Copyright 2017-2024 LinkedIn Corporation. All rights reserved.
* Licensed under the BSD-2 Clause license.
* See LICENSE in the project root for license information.
*/
Expand Down Expand Up @@ -140,16 +140,12 @@ public static RelDataType convert(StructTypeInfo structType, final RelDataTypeFa
// The schema of output Struct conforms to https://github.com/trinodb/trino/pull/3483
// except we adopted "integer" for the type of "tag" field instead of "tinyint" in the Trino patch
// for compatibility with other platforms that Iceberg currently doesn't support tinyint type.
// When the field count inside UnionTypeInfo is one, we surface the underlying RelDataType instead.

// Note: this is subject to change in the future pending on the discussion in
// https://mail-archives.apache.org/mod_mbox/iceberg-dev/202112.mbox/browser
public static RelDataType convert(UnionTypeInfo unionType, RelDataTypeFactory dtFactory) {
List<RelDataType> fTypes = unionType.getAllUnionObjectTypeInfos().stream()
.map(typeInfo -> convert(typeInfo, dtFactory)).collect(Collectors.toList());
if (fTypes.size() == 1) {
return dtFactory.createTypeWithNullability(fTypes.get(0), true);
}
KevinGe00 marked this conversation as resolved.
Show resolved Hide resolved
List<String> fNames = IntStream.range(0, unionType.getAllUnionObjectTypeInfos().size()).mapToObj(i -> "field" + i)
.collect(Collectors.toList());
fTypes.add(0, dtFactory.createSqlType(SqlTypeName.INTEGER));
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/**
* Copyright 2018-2023 LinkedIn Corporation. All rights reserved.
* Copyright 2018-2024 LinkedIn Corporation. All rights reserved.
* Licensed under the BSD-2 Clause license.
* See LICENSE in the project root for license information.
*/
Expand Down Expand Up @@ -74,15 +74,6 @@ public RelDataType deriveType(SqlValidator validator, SqlValidatorScope scope, S
if (funcType.isStruct()) {
return funcType.getField(fieldNameStripQuotes(call.operand(1)), false, false).getType();
}

// When the first operand is a SqlBasicCall with a non-struct RelDataType and the second operand is `tag_0`,
// such as `extract_union`(`product`.`value`).`tag_0` or (`extract_union`(`product`.`value`).`id`).`tag_0`,
// derived data type is first operand's RelDataType.
// This strategy ensures that RelDataType derivation remains successful for the specified sqlCalls while maintaining backward compatibility.
// Such SqlCalls are transformed {@link com.linkedin.coral.transformers.SingleUnionFieldReferenceTransformer}
if (FunctionFieldReferenceOperator.fieldNameStripQuotes(call.operand(1)).equalsIgnoreCase("tag_0")) {
return funcType;
}
}
return super.deriveType(validator, scope, call);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/**
* Copyright 2019-2023 LinkedIn Corporation. All rights reserved.
* Copyright 2019-2024 LinkedIn Corporation. All rights reserved.
* Licensed under the BSD-2 Clause license.
* See LICENSE in the project root for license information.
*/
Expand Down Expand Up @@ -495,48 +495,58 @@ public RexNode visitRangeRef(RexRangeRef rexRangeRef) {
public RexNode visitFieldAccess(RexFieldAccess rexFieldAccess) {
RexNode referenceExpr = rexFieldAccess.getReferenceExpr();

if (referenceExpr instanceof RexCall
&& ((RexCall) referenceExpr).getOperator() instanceof SqlUserDefinedFunction) {
String oldFieldName = rexFieldAccess.getField().getName();
String suggestNewFieldName = suggestedFieldNames.poll();
String newFieldName = SchemaUtilities.getFieldName(oldFieldName, suggestNewFieldName);

RelDataType fieldType = rexFieldAccess.getType();
boolean isNullable = SchemaUtilities.isFieldNullable((RexCall) referenceExpr, inputSchema);
// TODO: add field documentation
SchemaUtilities.appendField(newFieldName, fieldType, null, fieldAssembler, isNullable);
} else {
Deque<String> innerRecordNames = new LinkedList<>();
while (!(referenceExpr instanceof RexInputRef)) {
if (referenceExpr instanceof RexCall
&& ((RexCall) referenceExpr).getOperator().getName().equalsIgnoreCase("ITEM")) {
// While selecting `int_field` from `array_col:array<struct<int_field:int>>` using `array_col[x].int_field`,
// `rexFieldAccess` is like `ITEM($1, 1).int_field`, we need to set `referenceExpr` to be the first operand (`$1`) of `ITEM` function
referenceExpr = ((RexCall) referenceExpr).getOperands().get(0);
} else if (referenceExpr instanceof RexFieldAccess) {
// While selecting `int_field` from `struct_col:struct<inner_struct_col:struct<int_field:int>>` using `struct_col.inner_struct_col.int_field`,
// `rexFieldAccess` is like `$3.inner_struct_col.int_field`, we need to set `referenceExpr` to be the expr (`$3`) of itself.
// Besides, we need to store the field name (`inner_struct_col`) in `fieldNames` so that we can retrieve the correct inner struct from `topSchema` afterwards
innerRecordNames.push(((RexFieldAccess) referenceExpr).getField().getName());
referenceExpr = ((RexFieldAccess) referenceExpr).getReferenceExpr();
} else {
return super.visitFieldAccess(rexFieldAccess);
}
Deque<String> innerRecordNames = new LinkedList<>();
while (!(referenceExpr instanceof RexInputRef)) {
if (referenceExpr instanceof RexCall
&& ((RexCall) referenceExpr).getOperator().getName().equalsIgnoreCase("ITEM")) {
// While selecting `int_field` from `array_col:array<struct<int_field:int>>` using `array_col[x].int_field`,
// `rexFieldAccess` is like `ITEM($1, 1).int_field`, we need to set `referenceExpr` to be the first operand (`$1`) of `ITEM` function
referenceExpr = ((RexCall) referenceExpr).getOperands().get(0);
} else if (referenceExpr instanceof RexCall
&& ((RexCall) referenceExpr).getOperator() instanceof SqlUserDefinedFunction) {
// UDFs calls could potentially be doubly (or more) field-referenced, for example, `extract_union(baz).single.tag_0`
// where baz is a struct containing a uniontype field. In this case, we simply need to use derived type of the entire
// call. Note that this also takes care of the simple one layer field reference on a UDF call.
handleUDFFieldAccess(rexFieldAccess, (RexCall) referenceExpr);
return rexFieldAccess;
} else if (referenceExpr instanceof RexFieldAccess) {
// While selecting `int_field` from `struct_col:struct<inner_struct_col:struct<int_field:int>>` using `struct_col.inner_struct_col.int_field`,
// `rexFieldAccess` is like `$3.inner_struct_col.int_field`, we need to set `referenceExpr` to be the expr (`$3`) of itself.
// Besides, we need to store the field name (`inner_struct_col`) in `fieldNames` so that we can retrieve the correct inner struct from `topSchema` afterwards
innerRecordNames.push(((RexFieldAccess) referenceExpr).getField().getName());
referenceExpr = ((RexFieldAccess) referenceExpr).getReferenceExpr();
} else {
return super.visitFieldAccess(rexFieldAccess);
}

String oldFieldName = rexFieldAccess.getField().getName();
String suggestNewFieldName = suggestedFieldNames.poll();
String newFieldName = SchemaUtilities.getFieldName(oldFieldName, suggestNewFieldName);
Schema topSchema = inputSchema.getFields().get(((RexInputRef) referenceExpr).getIndex()).schema();

Schema.Field accessedField = getFieldFromTopSchema(topSchema, oldFieldName, innerRecordNames);
assert accessedField != null;
SchemaUtilities.appendField(newFieldName, accessedField, fieldAssembler);
}

handleFieldAccess(rexFieldAccess, (RexInputRef) referenceExpr, innerRecordNames);
return rexFieldAccess;
}

private void handleFieldAccess(RexFieldAccess rexFieldAccess, RexInputRef referenceExpr,
Deque<String> innerRecordNames) {
String oldFieldName = rexFieldAccess.getField().getName();
String suggestNewFieldName = suggestedFieldNames.poll();
String newFieldName = SchemaUtilities.getFieldName(oldFieldName, suggestNewFieldName);

Schema topSchema = inputSchema.getFields().get(referenceExpr.getIndex()).schema();
Schema.Field accessedField = getFieldFromTopSchema(topSchema, oldFieldName, innerRecordNames);
assert accessedField != null;
SchemaUtilities.appendField(newFieldName, accessedField, fieldAssembler);
}

private void handleUDFFieldAccess(RexFieldAccess rexFieldAccess, RexCall referenceExpr) {
String oldFieldName = rexFieldAccess.getField().getName();
String suggestNewFieldName = suggestedFieldNames.poll();
String newFieldName = SchemaUtilities.getFieldName(oldFieldName, suggestNewFieldName);

RelDataType fieldType = rexFieldAccess.getType();
boolean isNullable = SchemaUtilities.isFieldNullable(referenceExpr, inputSchema);
// TODO: add field documentation
SchemaUtilities.appendField(newFieldName, fieldType, null, fieldAssembler, isNullable);
}

@Override
public RexNode visitSubQuery(RexSubQuery rexSubQuery) {
// TODO: implement this method
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,9 @@ private static void initializeTables() {

executeQuery("DROP TABLE IF EXISTS basedecimal");
executeQuery("CREATE TABLE IF NOT EXISTS basedecimal(decimal_col decimal(2,1))");

executeQuery(
"CREATE TABLE IF NOT EXISTS single_uniontypes(single uniontype<string>, struct_col struct<single:uniontype<string>>)");
}

private static void initializeUdfs() {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/**
* Copyright 2019-2023 LinkedIn Corporation. All rights reserved.
* Copyright 2019-2024 LinkedIn Corporation. All rights reserved.
* Licensed under the BSD-2 Clause license.
* See LICENSE in the project root for license information.
*/
Expand Down Expand Up @@ -560,6 +560,17 @@ public void testNullabliltyExtractUnionUDF() {
Assert.assertEquals(actual.toString(true), TestUtils.loadSchema("testNullabilityExtractUnionUDF-expected.avsc"));
}

@Test
public void testSingleUnionFieldReference() {
String sql =
"select extract_union(struct_col).single.tag_0 as single_in_struct, extract_union(single).tag_0 as single from single_uniontypes";
ViewToAvroSchemaConverter viewToAvroSchemaConverter = ViewToAvroSchemaConverter.create(hiveMetastoreClient);

Schema actual = viewToAvroSchemaConverter.toAvroSchema(sql);

Assert.assertEquals(actual.toString(true), TestUtils.loadSchema("testSingleUnionFieldReference-expected.avsc"));
}

@Test(enabled = false)
public void testRenameToLowercase() {
String viewSql = "CREATE VIEW v AS " + "SELECT bc.Id AS id, bc.Array_Col AS array_col " + "FROM basecomplex bc "
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
{
"type" : "record",
"name" : "SingleUniontypes",
"namespace" : "default.single_uniontypes",
"fields" : [ {
"name" : "single_in_struct",
"type" : [ "null", "string" ]
}, {
"name" : "single",
"type" : [ "null", "string" ]
} ]
}
Loading