From 7e1fe395df0831a9d65bf4b6126562252a7a7bf2 Mon Sep 17 00:00:00 2001 From: tsreaper Date: Thu, 14 Nov 2024 18:36:02 +0800 Subject: [PATCH] [flink] Support updating row type nested in array/map in Flink (#4528) --- .../paimon/casting/CastElementGetter.java | 41 ++++ .../paimon/casting/CastFieldGetter.java | 3 + .../apache/paimon/casting/CastedArray.java | 201 ++++++++++++++++++ .../org/apache/paimon/casting/CastedMap.java | 70 ++++++ .../org/apache/paimon/casting/CastedRow.java | 2 - .../paimon/schema/SchemaEvolutionUtil.java | 99 +++++---- .../apache/paimon/schema/SchemaManager.java | 44 +++- .../paimon/schema/SchemaManagerTest.java | 50 ++++- .../org/apache/paimon/flink/FlinkCatalog.java | 41 +++- .../paimon/flink/SchemaChangeITCase.java | 66 +++++- 10 files changed, 562 insertions(+), 55 deletions(-) create mode 100644 paimon-common/src/main/java/org/apache/paimon/casting/CastElementGetter.java create mode 100644 paimon-common/src/main/java/org/apache/paimon/casting/CastedArray.java create mode 100644 paimon-common/src/main/java/org/apache/paimon/casting/CastedMap.java diff --git a/paimon-common/src/main/java/org/apache/paimon/casting/CastElementGetter.java b/paimon-common/src/main/java/org/apache/paimon/casting/CastElementGetter.java new file mode 100644 index 000000000000..b8a91f572a35 --- /dev/null +++ b/paimon-common/src/main/java/org/apache/paimon/casting/CastElementGetter.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.casting; + +import org.apache.paimon.data.InternalArray; + +/** Get element from array and cast it according to specific {@link CastExecutor}. */ +public class CastElementGetter { + + private final InternalArray.ElementGetter elementGetter; + private final CastExecutor castExecutor; + + @SuppressWarnings("unchecked") + public CastElementGetter( + InternalArray.ElementGetter elementGetter, CastExecutor castExecutor) { + this.elementGetter = elementGetter; + this.castExecutor = (CastExecutor) castExecutor; + } + + @SuppressWarnings("unchecked") + public V getElementOrNull(InternalArray array, int pos) { + Object value = elementGetter.getElementOrNull(array, pos); + return value == null ? null : (V) castExecutor.cast(value); + } +} diff --git a/paimon-common/src/main/java/org/apache/paimon/casting/CastFieldGetter.java b/paimon-common/src/main/java/org/apache/paimon/casting/CastFieldGetter.java index 02168300a842..208ef5f30f5b 100644 --- a/paimon-common/src/main/java/org/apache/paimon/casting/CastFieldGetter.java +++ b/paimon-common/src/main/java/org/apache/paimon/casting/CastFieldGetter.java @@ -24,14 +24,17 @@ * Get field value from row with given pos and cast it according to specific {@link CastExecutor}. */ public class CastFieldGetter { + private final InternalRow.FieldGetter fieldGetter; private final CastExecutor castExecutor; + @SuppressWarnings("unchecked") public CastFieldGetter(InternalRow.FieldGetter fieldGetter, CastExecutor castExecutor) { this.fieldGetter = fieldGetter; this.castExecutor = (CastExecutor) castExecutor; } + @SuppressWarnings("unchecked") public V getFieldOrNull(InternalRow row) { Object value = fieldGetter.getFieldOrNull(row); return value == null ? null : (V) castExecutor.cast(value); diff --git a/paimon-common/src/main/java/org/apache/paimon/casting/CastedArray.java b/paimon-common/src/main/java/org/apache/paimon/casting/CastedArray.java new file mode 100644 index 000000000000..778b11d1f887 --- /dev/null +++ b/paimon-common/src/main/java/org/apache/paimon/casting/CastedArray.java @@ -0,0 +1,201 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.casting; + +import org.apache.paimon.data.BinaryString; +import org.apache.paimon.data.Decimal; +import org.apache.paimon.data.InternalArray; +import org.apache.paimon.data.InternalMap; +import org.apache.paimon.data.InternalRow; +import org.apache.paimon.data.Timestamp; + +/** + * An implementation of {@link InternalArray} which provides a casted view of the underlying {@link + * InternalArray}. + * + *

It reads data from underlying {@link InternalArray} according to source logical type and casts + * it with specific {@link CastExecutor}. + */ +public class CastedArray implements InternalArray { + + private final CastElementGetter castElementGetter; + private InternalArray array; + + protected CastedArray(CastElementGetter castElementGetter) { + this.castElementGetter = castElementGetter; + } + + /** + * Replaces the underlying {@link InternalArray} backing this {@link CastedArray}. + * + *

This method replaces the array in place and does not return a new object. This is done for + * performance reasons. + */ + public static CastedArray from(CastElementGetter castElementGetter) { + return new CastedArray(castElementGetter); + } + + public CastedArray replaceArray(InternalArray array) { + this.array = array; + return this; + } + + @Override + public int size() { + return array.size(); + } + + @Override + public boolean[] toBooleanArray() { + boolean[] result = new boolean[size()]; + for (int i = 0; i < result.length; i++) { + result[i] = castElementGetter.getElementOrNull(array, i); + } + return result; + } + + @Override + public byte[] toByteArray() { + byte[] result = new byte[size()]; + for (int i = 0; i < result.length; i++) { + result[i] = castElementGetter.getElementOrNull(array, i); + } + return result; + } + + @Override + public short[] toShortArray() { + short[] result = new short[size()]; + for (int i = 0; i < result.length; i++) { + result[i] = castElementGetter.getElementOrNull(array, i); + } + return result; + } + + @Override + public int[] toIntArray() { + int[] result = new int[size()]; + for (int i = 0; i < result.length; i++) { + result[i] = castElementGetter.getElementOrNull(array, i); + } + return result; + } + + @Override + public long[] toLongArray() { + long[] result = new long[size()]; + for (int i = 0; i < result.length; i++) { + result[i] = castElementGetter.getElementOrNull(array, i); + } + return result; + } + + @Override + public float[] toFloatArray() { + float[] result = new float[size()]; + for (int i = 0; i < result.length; i++) { + result[i] = castElementGetter.getElementOrNull(array, i); + } + return result; + } + + @Override + public double[] toDoubleArray() { + double[] result = new double[size()]; + for (int i = 0; i < result.length; i++) { + result[i] = castElementGetter.getElementOrNull(array, i); + } + return result; + } + + @Override + public boolean isNullAt(int pos) { + return castElementGetter.getElementOrNull(array, pos) == null; + } + + @Override + public boolean getBoolean(int pos) { + return castElementGetter.getElementOrNull(array, pos); + } + + @Override + public byte getByte(int pos) { + return castElementGetter.getElementOrNull(array, pos); + } + + @Override + public short getShort(int pos) { + return castElementGetter.getElementOrNull(array, pos); + } + + @Override + public int getInt(int pos) { + return castElementGetter.getElementOrNull(array, pos); + } + + @Override + public long getLong(int pos) { + return castElementGetter.getElementOrNull(array, pos); + } + + @Override + public float getFloat(int pos) { + return castElementGetter.getElementOrNull(array, pos); + } + + @Override + public double getDouble(int pos) { + return castElementGetter.getElementOrNull(array, pos); + } + + @Override + public BinaryString getString(int pos) { + return castElementGetter.getElementOrNull(array, pos); + } + + @Override + public Decimal getDecimal(int pos, int precision, int scale) { + return castElementGetter.getElementOrNull(array, pos); + } + + @Override + public Timestamp getTimestamp(int pos, int precision) { + return castElementGetter.getElementOrNull(array, pos); + } + + @Override + public byte[] getBinary(int pos) { + return castElementGetter.getElementOrNull(array, pos); + } + + @Override + public InternalArray getArray(int pos) { + return castElementGetter.getElementOrNull(array, pos); + } + + @Override + public InternalMap getMap(int pos) { + return castElementGetter.getElementOrNull(array, pos); + } + + @Override + public InternalRow getRow(int pos, int numFields) { + return castElementGetter.getElementOrNull(array, pos); + } +} diff --git a/paimon-common/src/main/java/org/apache/paimon/casting/CastedMap.java b/paimon-common/src/main/java/org/apache/paimon/casting/CastedMap.java new file mode 100644 index 000000000000..4068407ca71c --- /dev/null +++ b/paimon-common/src/main/java/org/apache/paimon/casting/CastedMap.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.casting; + +import org.apache.paimon.data.InternalArray; +import org.apache.paimon.data.InternalMap; + +/** + * An implementation of {@link InternalMap} which provides a casted view of the underlying {@link + * InternalMap}. + * + *

It reads data from underlying {@link InternalMap} according to source logical type and casts + * it with specific {@link CastExecutor}. + */ +public class CastedMap implements InternalMap { + + private final CastedArray castedValueArray; + private InternalMap map; + + protected CastedMap(CastElementGetter castValueGetter) { + this.castedValueArray = CastedArray.from(castValueGetter); + } + + /** + * Replaces the underlying {@link InternalMap} backing this {@link CastedMap}. + * + *

This method replaces the map in place and does not return a new object. This is done for + * performance reasons. + */ + public static CastedMap from(CastElementGetter castValueGetter) { + return new CastedMap(castValueGetter); + } + + public CastedMap replaceMap(InternalMap map) { + this.castedValueArray.replaceArray(map.valueArray()); + this.map = map; + return this; + } + + @Override + public int size() { + return map.size(); + } + + @Override + public InternalArray keyArray() { + return map.keyArray(); + } + + @Override + public InternalArray valueArray() { + return castedValueArray; + } +} diff --git a/paimon-common/src/main/java/org/apache/paimon/casting/CastedRow.java b/paimon-common/src/main/java/org/apache/paimon/casting/CastedRow.java index 25c5744255ef..f9216d10b3a8 100644 --- a/paimon-common/src/main/java/org/apache/paimon/casting/CastedRow.java +++ b/paimon-common/src/main/java/org/apache/paimon/casting/CastedRow.java @@ -34,8 +34,6 @@ * *

It reads data from underlying {@link InternalRow} according to source logical type and casts * it with specific {@link CastExecutor}. - * - *

Note: This class supports only top-level castings, not nested castings. */ public class CastedRow implements InternalRow { diff --git a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaEvolutionUtil.java b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaEvolutionUtil.java index b5d730707359..0ae2798c29e0 100644 --- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaEvolutionUtil.java +++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaEvolutionUtil.java @@ -19,10 +19,15 @@ package org.apache.paimon.schema; import org.apache.paimon.KeyValue; +import org.apache.paimon.casting.CastElementGetter; import org.apache.paimon.casting.CastExecutor; import org.apache.paimon.casting.CastExecutors; import org.apache.paimon.casting.CastFieldGetter; +import org.apache.paimon.casting.CastedArray; +import org.apache.paimon.casting.CastedMap; import org.apache.paimon.casting.CastedRow; +import org.apache.paimon.data.InternalArray; +import org.apache.paimon.data.InternalMap; import org.apache.paimon.data.InternalRow; import org.apache.paimon.predicate.LeafPredicate; import org.apache.paimon.predicate.Predicate; @@ -31,7 +36,6 @@ import org.apache.paimon.types.DataField; import org.apache.paimon.types.DataType; import org.apache.paimon.types.MapType; -import org.apache.paimon.types.MultisetType; import org.apache.paimon.types.RowType; import org.apache.paimon.utils.InternalRowUtils; import org.apache.paimon.utils.ProjectedRow; @@ -372,6 +376,7 @@ private static CastFieldGetter[] createCastFieldGetterMapping( List tableFields, List dataFields, int[] indexMapping) { CastFieldGetter[] converterMapping = new CastFieldGetter[tableFields.size()]; boolean castExist = false; + for (int i = 0; i < tableFields.size(); i++) { int dataIndex = indexMapping == null ? i : indexMapping[i]; if (dataIndex < 0) { @@ -380,53 +385,39 @@ private static CastFieldGetter[] createCastFieldGetterMapping( } else { DataField tableField = tableFields.get(i); DataField dataField = dataFields.get(dataIndex); - if (dataField.type().equalsIgnoreNullable(tableField.type())) { - // Create getter with index i and projected row data will convert to underlying - // data - converterMapping[i] = - new CastFieldGetter( - InternalRowUtils.createNullCheckingFieldGetter( - dataField.type(), i), - CastExecutors.identityCastExecutor()); - } else { - // TODO support column type evolution in nested type - checkState( - !(tableField.type() instanceof MapType - || dataField.type() instanceof ArrayType - || dataField.type() instanceof MultisetType), - "Only support column type evolution in atomic and row data type."); - - CastExecutor castExecutor; - if (tableField.type() instanceof RowType - && dataField.type() instanceof RowType) { - castExecutor = - createRowCastExecutor( - (RowType) dataField.type(), (RowType) tableField.type()); - } else { - castExecutor = CastExecutors.resolve(dataField.type(), tableField.type()); - } - checkNotNull( - castExecutor, - "Cannot cast from type " - + dataField.type() - + " to type " - + tableField.type()); - - // Create getter with index i and projected row data will convert to underlying - // data - converterMapping[i] = - new CastFieldGetter( - InternalRowUtils.createNullCheckingFieldGetter( - dataField.type(), i), - castExecutor); + if (!dataField.type().equalsIgnoreNullable(tableField.type())) { castExist = true; } + + // Create getter with index i and projected row data will convert to underlying data + converterMapping[i] = + new CastFieldGetter( + InternalRowUtils.createNullCheckingFieldGetter(dataField.type(), i), + createCastExecutor(dataField.type(), tableField.type())); } } return castExist ? converterMapping : null; } + private static CastExecutor createCastExecutor(DataType inputType, DataType targetType) { + if (targetType.equalsIgnoreNullable(inputType)) { + return CastExecutors.identityCastExecutor(); + } else if (inputType instanceof RowType && targetType instanceof RowType) { + return createRowCastExecutor((RowType) inputType, (RowType) targetType); + } else if (inputType instanceof ArrayType && targetType instanceof ArrayType) { + return createArrayCastExecutor((ArrayType) inputType, (ArrayType) targetType); + } else if (inputType instanceof MapType && targetType instanceof MapType) { + return createMapCastExecutor((MapType) inputType, (MapType) targetType); + } else { + return checkNotNull( + CastExecutors.resolve(inputType, targetType), + "Cannot cast from type %s to type %s", + inputType, + targetType); + } + } + private static CastExecutor createRowCastExecutor( RowType inputType, RowType targetType) { int[] indexMapping = createIndexMapping(targetType.getFields(), inputType.getFields()); @@ -446,4 +437,32 @@ private static CastExecutor createRowCastExecutor( return value; }; } + + private static CastExecutor createArrayCastExecutor( + ArrayType inputType, ArrayType targetType) { + CastElementGetter castElementGetter = + new CastElementGetter( + InternalArray.createElementGetter(inputType.getElementType()), + createCastExecutor( + inputType.getElementType(), targetType.getElementType())); + + CastedArray castedArray = CastedArray.from(castElementGetter); + return castedArray::replaceArray; + } + + private static CastExecutor createMapCastExecutor( + MapType inputType, MapType targetType) { + checkState( + inputType.getKeyType().equals(targetType.getKeyType()), + "Cannot cast map type %s to map type %s, because they have different key types.", + inputType.getKeyType(), + targetType.getKeyType()); + CastElementGetter castElementGetter = + new CastElementGetter( + InternalArray.createElementGetter(inputType.getValueType()), + createCastExecutor(inputType.getValueType(), targetType.getValueType())); + + CastedMap castedMap = CastedMap.from(castElementGetter); + return castedMap::replaceMap; + } } diff --git a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java index 86e365a88f83..a84348810b99 100644 --- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java +++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java @@ -37,9 +37,11 @@ import org.apache.paimon.schema.SchemaChange.UpdateColumnType; import org.apache.paimon.schema.SchemaChange.UpdateComment; import org.apache.paimon.table.FileStoreTableFactory; +import org.apache.paimon.types.ArrayType; import org.apache.paimon.types.DataField; import org.apache.paimon.types.DataType; import org.apache.paimon.types.DataTypeCasts; +import org.apache.paimon.types.MapType; import org.apache.paimon.types.ReassignFieldId; import org.apache.paimon.types.RowType; import org.apache.paimon.utils.BranchManager; @@ -636,17 +638,17 @@ public void updateIntermediateColumn(List newFields, int depth) continue; } + String fullFieldName = + String.join(".", Arrays.asList(updateFieldNames).subList(0, depth + 1)); List nestedFields = - new ArrayList<>( - ((org.apache.paimon.types.RowType) field.type()).getFields()); + new ArrayList<>(extractRowType(field.type(), fullFieldName).getFields()); updateIntermediateColumn(nestedFields, depth + 1); newFields.set( i, new DataField( field.id(), field.name(), - new org.apache.paimon.types.RowType( - field.type().isNullable(), nestedFields), + wrapNewRowType(field.type(), nestedFields), field.description())); return; } @@ -656,6 +658,40 @@ public void updateIntermediateColumn(List newFields, int depth) String.join(".", Arrays.asList(updateFieldNames).subList(0, depth + 1))); } + private RowType extractRowType(DataType type, String fullFieldName) { + switch (type.getTypeRoot()) { + case ROW: + return (RowType) type; + case ARRAY: + return extractRowType(((ArrayType) type).getElementType(), fullFieldName); + case MAP: + return extractRowType(((MapType) type).getValueType(), fullFieldName); + default: + throw new IllegalArgumentException( + fullFieldName + " is not a structured type."); + } + } + + private DataType wrapNewRowType(DataType type, List nestedFields) { + switch (type.getTypeRoot()) { + case ROW: + return new RowType(type.isNullable(), nestedFields); + case ARRAY: + return new ArrayType( + type.isNullable(), + wrapNewRowType(((ArrayType) type).getElementType(), nestedFields)); + case MAP: + MapType mapType = (MapType) type; + return new MapType( + type.isNullable(), + mapType.getKeyType(), + wrapNewRowType(mapType.getValueType(), nestedFields)); + default: + throw new IllegalStateException( + "Trying to wrap a row type in " + type + ". This is unexpected."); + } + } + protected abstract void updateLastColumn(List newFields, String fieldName) throws Catalog.ColumnNotExistException, Catalog.ColumnAlreadyExistException; diff --git a/paimon-core/src/test/java/org/apache/paimon/schema/SchemaManagerTest.java b/paimon-core/src/test/java/org/apache/paimon/schema/SchemaManagerTest.java index 088cb72f92e6..f0d654369965 100644 --- a/paimon-core/src/test/java/org/apache/paimon/schema/SchemaManagerTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/schema/SchemaManagerTest.java @@ -31,6 +31,7 @@ import org.apache.paimon.table.FileStoreTableFactory; import org.apache.paimon.table.sink.TableCommitImpl; import org.apache.paimon.table.sink.TableWriteImpl; +import org.apache.paimon.types.ArrayType; import org.apache.paimon.types.BigIntType; import org.apache.paimon.types.DataField; import org.apache.paimon.types.DataTypes; @@ -690,7 +691,7 @@ public void testUpdateNestedColumnType() throws Exception { SchemaChange updateColumnType = SchemaChange.updateColumnType( - new String[] {"v", "f2", "f1"}, DataTypes.BIGINT(), true); + new String[] {"v", "f2", "f1"}, DataTypes.BIGINT(), false); manager.commitChanges(updateColumnType); innerType = @@ -708,8 +709,53 @@ public void testUpdateNestedColumnType() throws Exception { SchemaChange middleColumnNotExistUpdateColumnType = SchemaChange.updateColumnType( - new String[] {"v", "invalid", "f1"}, DataTypes.BIGINT(), true); + new String[] {"v", "invalid", "f1"}, DataTypes.BIGINT(), false); assertThatCode(() -> manager.commitChanges(middleColumnNotExistUpdateColumnType)) .hasMessageContaining("Column v.invalid does not exist"); } + + @Test + public void testUpdateRowTypeInArrayAndMap() throws Exception { + RowType innerType = + RowType.of( + new DataField(2, "f1", DataTypes.INT()), + new DataField(3, "f2", DataTypes.BIGINT())); + RowType outerType = + RowType.of( + new DataField(0, "k", DataTypes.INT()), + new DataField( + 1, "v", new ArrayType(new MapType(DataTypes.INT(), innerType)))); + + Schema schema = + new Schema( + outerType.getFields(), + Collections.singletonList("k"), + Collections.emptyList(), + new HashMap<>(), + ""); + SchemaManager manager = new SchemaManager(LocalFileIO.create(), path); + manager.createTable(schema); + + SchemaChange addColumn = + SchemaChange.addColumn( + new String[] {"v", "f3"}, + DataTypes.STRING(), + null, + SchemaChange.Move.first("f3")); + SchemaChange dropColumn = SchemaChange.dropColumn(new String[] {"v", "f2"}); + SchemaChange updateColumnType = + SchemaChange.updateColumnType(new String[] {"v", "f1"}, DataTypes.BIGINT(), false); + manager.commitChanges(addColumn, dropColumn, updateColumnType); + + innerType = + RowType.of( + new DataField(4, "f3", DataTypes.STRING()), + new DataField(2, "f1", DataTypes.BIGINT())); + outerType = + RowType.of( + new DataField(0, "k", DataTypes.INT()), + new DataField( + 1, "v", new ArrayType(new MapType(DataTypes.INT(), innerType)))); + assertThat(manager.latest().get().logicalRowType()).isEqualTo(outerType); + } } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java index ae30fa569d59..09fc0328ef65 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java @@ -673,14 +673,13 @@ private void generateNestedColumnUpdates( org.apache.paimon.types.DataType oldType, org.apache.paimon.types.DataType newType, List schemaChanges) { + String joinedNames = String.join(".", fieldNames); if (oldType.getTypeRoot() == DataTypeRoot.ROW) { Preconditions.checkArgument( newType.getTypeRoot() == DataTypeRoot.ROW, - "Column " - + String.join(".", fieldNames) - + " can only be updated to row type, and cannot be updated to " - + newType - + " type"); + "Column %s can only be updated to row type, and cannot be updated to %s type", + joinedNames, + newType.getTypeRoot()); org.apache.paimon.types.RowType oldRowType = (org.apache.paimon.types.RowType) oldType; org.apache.paimon.types.RowType newRowType = (org.apache.paimon.types.RowType) newType; @@ -699,7 +698,7 @@ private void generateNestedColumnUpdates( lastIdx < idx, "Order of existing fields in column %s must be kept the same. " + "However, field %s and %s have changed their orders.", - String.join(".", fieldNames), + joinedNames, lastFieldName, name); lastIdx = idx; @@ -751,6 +750,36 @@ private void generateNestedColumnUpdates( fullFieldNames, oldField.type(), field.type(), schemaChanges); } } + } else if (oldType.getTypeRoot() == DataTypeRoot.ARRAY) { + Preconditions.checkArgument( + newType.getTypeRoot() == DataTypeRoot.ARRAY, + "Column %s can only be updated to array type, and cannot be updated to %s type", + joinedNames, + newType); + generateNestedColumnUpdates( + fieldNames, + ((org.apache.paimon.types.ArrayType) oldType).getElementType(), + ((org.apache.paimon.types.ArrayType) newType).getElementType(), + schemaChanges); + } else if (oldType.getTypeRoot() == DataTypeRoot.MAP) { + Preconditions.checkArgument( + newType.getTypeRoot() == DataTypeRoot.MAP, + "Column %s can only be updated to map type, and cannot be updated to %s type", + joinedNames, + newType); + org.apache.paimon.types.MapType oldMapType = (org.apache.paimon.types.MapType) oldType; + org.apache.paimon.types.MapType newMapType = (org.apache.paimon.types.MapType) newType; + Preconditions.checkArgument( + oldMapType.getKeyType().equals(newMapType.getKeyType()), + "Cannot update key type of column %s from %s type to %s type", + joinedNames, + oldMapType.getKeyType(), + newMapType.getKeyType()); + generateNestedColumnUpdates( + fieldNames, + oldMapType.getValueType(), + newMapType.getValueType(), + schemaChanges); } else { if (!oldType.equalsIgnoreNullable(newType)) { schemaChanges.add( diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/SchemaChangeITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/SchemaChangeITCase.java index a2ef1d5c8ace..a8e8332156b3 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/SchemaChangeITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/SchemaChangeITCase.java @@ -29,6 +29,7 @@ import org.junit.jupiter.params.provider.ValueSource; import java.time.format.DateTimeFormatter; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Objects; @@ -1133,6 +1134,69 @@ public void testUpdateNestedColumn(String formatType) { assertThatCode(() -> sql("ALTER TABLE T MODIFY (v ROW(f1 BIGINT, f2 INT, f3 STRING))")) .hasRootCauseMessage( - "Column v.f2 can only be updated to row type, and cannot be updated to INT type"); + "Column v.f2 can only be updated to row type, and cannot be updated to INTEGER type"); + } + + @ParameterizedTest() + @ValueSource(strings = {"orc", "avro", "parquet"}) + public void testUpdateRowInArrayAndMap(String formatType) { + sql( + "CREATE TABLE T " + + "( k INT, v1 ARRAY, v2 MAP, PRIMARY KEY (k) NOT ENFORCED ) " + + "WITH ( 'bucket' = '1', 'file.format' = '" + + formatType + + "' )"); + sql( + "INSERT INTO T VALUES " + + "(1, ARRAY[ROW(100, 'apple'), ROW(101, 'banana')], MAP[100, ROW('cat', 1000), 101, ROW('dog', 1001)]), " + + "(2, ARRAY[ROW(200, 'pear'), ROW(201, 'grape')], MAP[200, ROW('tiger', 2000), 201, ROW('wolf', 2001)])"); + + Map map1 = new HashMap<>(); + map1.put(100, Row.of("cat", 1000)); + map1.put(101, Row.of("dog", 1001)); + Map map2 = new HashMap<>(); + map2.put(200, Row.of("tiger", 2000)); + map2.put(201, Row.of("wolf", 2001)); + assertThat(sql("SELECT * FROM T")) + .containsExactlyInAnyOrder( + Row.of(1, new Row[] {Row.of(100, "apple"), Row.of(101, "banana")}, map1), + Row.of(2, new Row[] {Row.of(200, "pear"), Row.of(201, "grape")}, map2)); + + sql( + "ALTER TABLE T MODIFY (v1 ARRAY, v2 MAP)"); + sql( + "INSERT INTO T VALUES " + + "(1, ARRAY[ROW(1000000000000, 'apple', 'A'), ROW(1000000000001, 'banana', 'B')], MAP[100, ROW(1000.0, 1000), 101, ROW(1001.0, 1001)]), " + + "(3, ARRAY[ROW(3000000000000, 'mango', 'M'), ROW(3000000000001, 'cherry', 'C')], MAP[300, ROW(3000.0, 3000), 301, ROW(3001.0, 3001)])"); + + map1.clear(); + map1.put(100, Row.of(1000.0, 1000)); + map1.put(101, Row.of(1001.0, 1001)); + map2.clear(); + map2.put(200, Row.of(null, 2000)); + map2.put(201, Row.of(null, 2001)); + Map map3 = new HashMap<>(); + map3.put(300, Row.of(3000.0, 3000)); + map3.put(301, Row.of(3001.0, 3001)); + assertThat(sql("SELECT v2, v1, k FROM T")) + .containsExactlyInAnyOrder( + Row.of( + map1, + new Row[] { + Row.of(1000000000000L, "apple", "A"), + Row.of(1000000000001L, "banana", "B") + }, + 1), + Row.of( + map2, + new Row[] {Row.of(200L, "pear", null), Row.of(201L, "grape", null)}, + 2), + Row.of( + map3, + new Row[] { + Row.of(3000000000000L, "mango", "M"), + Row.of(3000000000001L, "cherry", "C") + }, + 3)); } }