diff --git a/core/src/main/java/org/apache/gravitino/catalog/TableOperationDispatcher.java b/core/src/main/java/org/apache/gravitino/catalog/TableOperationDispatcher.java index b54f0688715..da869d65f6a 100644 --- a/core/src/main/java/org/apache/gravitino/catalog/TableOperationDispatcher.java +++ b/core/src/main/java/org/apache/gravitino/catalog/TableOperationDispatcher.java @@ -33,6 +33,7 @@ import java.util.Map; import java.util.function.Function; import java.util.stream.Collectors; +import java.util.stream.IntStream; import org.apache.commons.lang3.tuple.Pair; import org.apache.gravitino.EntityStore; import org.apache.gravitino.GravitinoEnv; @@ -499,8 +500,8 @@ private Table internalCreateTable( .withCreateTime(Instant.now()) .build(); List columnEntityList = - Arrays.stream(columns) - .map(c -> ColumnEntity.toColumnEntity(c, idGenerator.nextId(), audit)) + IntStream.range(0, columns.length) + .mapToObj(i -> ColumnEntity.toColumnEntity(columns[i], i, idGenerator.nextId(), audit)) .collect(Collectors.toList()); TableEntity tableEntity = @@ -531,13 +532,14 @@ private Table internalCreateTable( private List toColumnEntities(Column[] columns, AuditInfo audit) { return columns == null ? Collections.emptyList() - : Arrays.stream(columns) - .map(c -> ColumnEntity.toColumnEntity(c, idGenerator.nextId(), audit)) + : IntStream.range(0, columns.length) + .mapToObj(i -> ColumnEntity.toColumnEntity(columns[i], i, idGenerator.nextId(), audit)) .collect(Collectors.toList()); } - private boolean isSameColumn(Column left, ColumnEntity right) { + private boolean isSameColumn(Column left, int columnPosition, ColumnEntity right) { return Objects.equal(left.name(), right.name()) + && columnPosition == right.position() && Objects.equal(left.dataType(), right.dataType()) && Objects.equal(left.comment(), right.comment()) && left.nullable() == right.nullable() @@ -554,11 +556,12 @@ private Pair> updateColumnsIfNecessary( return Pair.of(false, Collections.emptyList()); } - Map columnsFromCatalogTable = + Map> columnsFromCatalogTable = tableFromCatalog.columns() == null ? Collections.emptyMap() - : Arrays.stream(tableFromCatalog.columns()) - .collect(Collectors.toMap(Column::name, Function.identity())); + : IntStream.range(0, tableFromCatalog.columns().length) + .mapToObj(i -> Pair.of(i, tableFromCatalog.columns()[i])) + .collect(Collectors.toMap(p -> p.getRight().name(), Function.identity())); Map columnsFromTableEntity = tableFromGravitino.columns() == null ? Collections.emptyMap() @@ -569,25 +572,27 @@ private Pair> updateColumnsIfNecessary( List columnsToInsert = Lists.newArrayList(); boolean columnsNeedsUpdate = false; for (Map.Entry entry : columnsFromTableEntity.entrySet()) { - Column column = columnsFromCatalogTable.get(entry.getKey()); - if (column == null) { + Pair columnPair = columnsFromCatalogTable.get(entry.getKey()); + if (columnPair == null) { LOG.debug( "Column {} is not found in the table from underlying source, it will be removed" + " from the table entity", entry.getKey()); columnsNeedsUpdate = true; - } else if (!isSameColumn(column, entry.getValue())) { + } else if (!isSameColumn(columnPair.getRight(), columnPair.getLeft(), entry.getValue())) { // If the column need to be updated, we create a new ColumnEntity with the same id LOG.debug( "Column {} is found in the table from underlying source, but it is different " + "from the one in the table entity, it will be updated", entry.getKey()); + Column column = columnPair.getRight(); ColumnEntity updatedColumnEntity = ColumnEntity.builder() .withId(entry.getValue().id()) .withName(column.name()) + .withPosition(columnPair.getLeft()) .withDataType(column.dataType()) .withComment(column.comment()) .withNullable(column.nullable()) @@ -612,7 +617,7 @@ private Pair> updateColumnsIfNecessary( } // Check if there are new columns in the table from the underlying source - for (Map.Entry entry : columnsFromCatalogTable.entrySet()) { + for (Map.Entry> entry : columnsFromCatalogTable.entrySet()) { if (!columnsFromTableEntity.containsKey(entry.getKey())) { LOG.debug( "Column {} is found in the table from underlying source but not in the table " @@ -620,7 +625,8 @@ private Pair> updateColumnsIfNecessary( entry.getKey()); ColumnEntity newColumnEntity = ColumnEntity.toColumnEntity( - entry.getValue(), + entry.getValue().getRight(), + entry.getValue().getLeft(), idGenerator.nextId(), AuditInfo.builder() .withCreator(PrincipalUtils.getCurrentPrincipal().getName()) diff --git a/core/src/main/java/org/apache/gravitino/meta/ColumnEntity.java b/core/src/main/java/org/apache/gravitino/meta/ColumnEntity.java index 37904426013..5e68e48744f 100644 --- a/core/src/main/java/org/apache/gravitino/meta/ColumnEntity.java +++ b/core/src/main/java/org/apache/gravitino/meta/ColumnEntity.java @@ -41,6 +41,8 @@ public class ColumnEntity implements Entity, Auditable { public static final Field ID = Field.required("id", Long.class, "The column's unique identifier"); public static final Field NAME = Field.required("name", String.class, "The column's name"); + public static final Field POSITION = + Field.required("position", Integer.class, "The column's position"); public static final Field TYPE = Field.required("dataType", Type.class, "The column's data type"); public static final Field COMMENT = Field.optional("comment", String.class, "The column's comment"); @@ -57,6 +59,8 @@ public class ColumnEntity implements Entity, Auditable { private String name; + private Integer position; + private Type dataType; private String comment; @@ -76,6 +80,7 @@ public Map fields() { Map fields = Maps.newHashMap(); fields.put(ID, id); fields.put(NAME, name); + fields.put(POSITION, position); fields.put(TYPE, dataType); fields.put(COMMENT, comment); fields.put(NULLABLE, nullable); @@ -104,6 +109,10 @@ public String name() { return name; } + public Integer position() { + return position; + } + public Type dataType() { return dataType; } @@ -132,6 +141,7 @@ public boolean equals(Object o) { ColumnEntity that = (ColumnEntity) o; return Objects.equal(id, that.id) && Objects.equal(name, that.name) + && Objects.equal(position, that.position) && Objects.equal(dataType, that.dataType) && Objects.equal(comment, that.comment) && Objects.equal(nullable, that.nullable) @@ -143,17 +153,19 @@ public boolean equals(Object o) { @Override public int hashCode() { return Objects.hashCode( - id, name, dataType, comment, nullable, autoIncrement, defaultValue, auditInfo); + id, name, position, dataType, comment, nullable, autoIncrement, defaultValue, auditInfo); } public static Builder builder() { return new Builder(); } - public static ColumnEntity toColumnEntity(Column column, long uid, AuditInfo audit) { + public static ColumnEntity toColumnEntity( + Column column, int position, long uid, AuditInfo audit) { return builder() .withId(uid) .withName(column.name()) + .withPosition(position) .withComment(column.comment()) .withDataType(column.dataType()) .withNullable(column.nullable()) @@ -180,6 +192,11 @@ public Builder withName(String name) { return this; } + public Builder withPosition(Integer position) { + columnEntity.position = position; + return this; + } + public Builder withDataType(Type dataType) { columnEntity.dataType = dataType; return this; diff --git a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/base/TableColumnBaseSQLProvider.java b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/base/TableColumnBaseSQLProvider.java index 0af9889ba6a..cdc32425b6f 100644 --- a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/base/TableColumnBaseSQLProvider.java +++ b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/base/TableColumnBaseSQLProvider.java @@ -28,6 +28,7 @@ public class TableColumnBaseSQLProvider { public String listColumnPOsByTableIdAndVersion( @Param("tableId") Long tableId, @Param("tableVersion") Long tableVersion) { return "SELECT t1.column_id AS columnId, t1.column_name AS columnName," + + " t1.column_position AS columnPosition," + " t1.metalake_id AS metalakeId, t1.catalog_id AS catalogId," + " t1.schema_id AS schemaId, t1.table_id AS tableId," + " t1.table_version AS tableVersion, t1.column_type AS columnType," @@ -50,15 +51,16 @@ public String insertColumnPOs(@Param("columnPOs") List columnPOs) { return ""; } diff --git a/core/src/main/java/org/apache/gravitino/storage/relational/po/ColumnPO.java b/core/src/main/java/org/apache/gravitino/storage/relational/po/ColumnPO.java index 46c79f97320..9238d88503b 100644 --- a/core/src/main/java/org/apache/gravitino/storage/relational/po/ColumnPO.java +++ b/core/src/main/java/org/apache/gravitino/storage/relational/po/ColumnPO.java @@ -127,6 +127,8 @@ public static AutoIncrement fromBoolean(boolean autoIncrement) { private String columnName; + private Integer columnPosition; + private Long metalakeId; private Long catalogId; @@ -177,6 +179,11 @@ public Builder withColumnName(String columnName) { return this; } + public Builder withColumnPosition(Integer columnPosition) { + columnPO.columnPosition = columnPosition; + return this; + } + public Builder withMetalakeId(Long metalakeId) { columnPO.metalakeId = metalakeId; return this; @@ -247,6 +254,7 @@ public ColumnPO build() { Preconditions.checkArgument( StringUtils.isNotBlank(columnPO.columnName), "Column name is required and cannot be blank"); + Preconditions.checkArgument(columnPO.columnPosition != null, "Column position is required"); Preconditions.checkArgument(columnPO.metalakeId != null, "Metalake id is required"); Preconditions.checkArgument(columnPO.catalogId != null, "Catalog id is required"); Preconditions.checkArgument(columnPO.schemaId != null, "Schema id is required"); diff --git a/core/src/main/java/org/apache/gravitino/storage/relational/service/TableColumnMetaService.java b/core/src/main/java/org/apache/gravitino/storage/relational/service/TableColumnMetaService.java index 7ec975d45f8..f881602bc53 100644 --- a/core/src/main/java/org/apache/gravitino/storage/relational/service/TableColumnMetaService.java +++ b/core/src/main/java/org/apache/gravitino/storage/relational/service/TableColumnMetaService.java @@ -110,12 +110,14 @@ void updateColumnPOsFromTableDiff( List columnPOsToInsert = Lists.newArrayList(); for (ColumnEntity newColumn : newColumns.values()) { ColumnEntity oldColumn = oldColumns.get(newColumn.id()); + // If the column is not existed in old columns, or if the column is updated, mark it as UPDATE if (oldColumn == null || !oldColumn.equals(newColumn)) { columnPOsToInsert.add( POConverters.initializeColumnPO(newTablePO, newColumn, ColumnPO.ColumnOpType.UPDATE)); } } + // Mark the columns to DELETE if they are not existed in new columns. for (ColumnEntity oldColumn : oldColumns.values()) { if (!newColumns.containsKey(oldColumn.id())) { columnPOsToInsert.add( diff --git a/core/src/main/java/org/apache/gravitino/storage/relational/utils/POConverters.java b/core/src/main/java/org/apache/gravitino/storage/relational/utils/POConverters.java index dabce09cb2f..4cccd06759f 100644 --- a/core/src/main/java/org/apache/gravitino/storage/relational/utils/POConverters.java +++ b/core/src/main/java/org/apache/gravitino/storage/relational/utils/POConverters.java @@ -452,6 +452,7 @@ public static ColumnEntity fromColumnPO(ColumnPO columnPO) { return ColumnEntity.builder() .withId(columnPO.getColumnId()) .withName(columnPO.getColumnName()) + .withPosition(columnPO.getColumnPosition()) .withDataType(JsonUtils.anyFieldMapper().readValue(columnPO.getColumnType(), Type.class)) .withComment(columnPO.getColumnComment()) .withAutoIncrement( @@ -482,6 +483,7 @@ public static ColumnPO initializeColumnPO( return ColumnPO.builder() .withColumnId(columnEntity.id()) .withColumnName(columnEntity.name()) + .withColumnPosition(columnEntity.position()) .withMetalakeId(tablePO.getMetalakeId()) .withCatalogId(tablePO.getCatalogId()) .withSchemaId(tablePO.getSchemaId()) diff --git a/core/src/test/java/org/apache/gravitino/TestColumn.java b/core/src/test/java/org/apache/gravitino/TestColumn.java index 7085da6d3fb..77410652465 100644 --- a/core/src/test/java/org/apache/gravitino/TestColumn.java +++ b/core/src/test/java/org/apache/gravitino/TestColumn.java @@ -26,17 +26,40 @@ @ToString public class TestColumn extends BaseColumn { + private int position; + private TestColumn() {} + public int position() { + return position; + } + + public void setPosition(int position) { + this.position = position; + } + public static class Builder extends BaseColumn.BaseColumnBuilder { + + private Integer position; + /** Creates a new instance of {@link Builder}. */ private Builder() {} + public Builder withPosition(int position) { + this.position = position; + return this; + } + @Override protected TestColumn internalBuild() { TestColumn column = new TestColumn(); + if (position == null) { + throw new IllegalArgumentException("Position is required"); + } + column.name = name; + column.position = position; column.comment = comment; column.dataType = dataType; column.nullable = nullable; diff --git a/core/src/test/java/org/apache/gravitino/catalog/TestTableNormalizeDispatcher.java b/core/src/test/java/org/apache/gravitino/catalog/TestTableNormalizeDispatcher.java index c45f5cab27d..1401f990ec2 100644 --- a/core/src/test/java/org/apache/gravitino/catalog/TestTableNormalizeDispatcher.java +++ b/core/src/test/java/org/apache/gravitino/catalog/TestTableNormalizeDispatcher.java @@ -75,8 +75,16 @@ public void testNameCaseInsensitive() { NameIdentifier tableIdent = NameIdentifier.of(tableNs, "tableNAME"); Column[] columns = new Column[] { - TestColumn.builder().withName("colNAME1").withType(Types.StringType.get()).build(), - TestColumn.builder().withName("colNAME2").withType(Types.StringType.get()).build() + TestColumn.builder() + .withName("colNAME1") + .withPosition(0) + .withType(Types.StringType.get()) + .build(), + TestColumn.builder() + .withName("colNAME2") + .withPosition(1) + .withType(Types.StringType.get()) + .build() }; RangePartition assignedPartition = Partitions.range( @@ -143,8 +151,16 @@ public void testNameSpec() { NameIdentifier.of(tableNs, MetadataObjects.METADATA_OBJECT_RESERVED_NAME); Column[] columns = new Column[] { - TestColumn.builder().withName("colNAME1").withType(Types.StringType.get()).build(), - TestColumn.builder().withName("colNAME2").withType(Types.StringType.get()).build() + TestColumn.builder() + .withName("colNAME1") + .withPosition(0) + .withType(Types.StringType.get()) + .build(), + TestColumn.builder() + .withName("colNAME2") + .withPosition(1) + .withType(Types.StringType.get()) + .build() }; Exception exception = Assertions.assertThrows( @@ -166,6 +182,7 @@ public void testNameSpec() { new Column[] { TestColumn.builder() .withName(MetadataObjects.METADATA_OBJECT_RESERVED_NAME) + .withPosition(0) .withType(Types.StringType.get()) .build() }; diff --git a/core/src/test/java/org/apache/gravitino/catalog/TestTableOperationDispatcher.java b/core/src/test/java/org/apache/gravitino/catalog/TestTableOperationDispatcher.java index f31b95e1e78..56322acf285 100644 --- a/core/src/test/java/org/apache/gravitino/catalog/TestTableOperationDispatcher.java +++ b/core/src/test/java/org/apache/gravitino/catalog/TestTableOperationDispatcher.java @@ -96,7 +96,16 @@ public void testCreateAndListTables() throws IOException { NameIdentifier tableIdent1 = NameIdentifier.of(tableNs, "table1"); Column[] columns = new Column[] { - Column.of("col1", Types.StringType.get()), Column.of("col2", Types.StringType.get()) + TestColumn.builder() + .withName("col1") + .withPosition(0) + .withType(Types.StringType.get()) + .build(), + TestColumn.builder() + .withName("col2") + .withPosition(1) + .withType(Types.StringType.get()) + .build() }; Table table1 = @@ -175,8 +184,16 @@ public void testCreateAndLoadTable() throws IOException { NameIdentifier tableIdent1 = NameIdentifier.of(tableNs, "table11"); Column[] columns = new Column[] { - TestColumn.builder().withName("col1").withType(Types.StringType.get()).build(), - TestColumn.builder().withName("col2").withType(Types.StringType.get()).build() + TestColumn.builder() + .withName("col1") + .withPosition(0) + .withType(Types.StringType.get()) + .build(), + TestColumn.builder() + .withName("col2") + .withPosition(1) + .withType(Types.StringType.get()) + .build() }; Table table1 = @@ -244,8 +261,16 @@ public void testCreateAndAlterTable() throws IOException { NameIdentifier tableIdent = NameIdentifier.of(tableNs, "table21"); Column[] columns = new Column[] { - TestColumn.builder().withName("col1").withType(Types.StringType.get()).build(), - TestColumn.builder().withName("col2").withType(Types.StringType.get()).build() + TestColumn.builder() + .withName("col1") + .withPosition(0) + .withType(Types.StringType.get()) + .build(), + TestColumn.builder() + .withName("col2") + .withPosition(1) + .withType(Types.StringType.get()) + .build() }; Table table = @@ -310,8 +335,16 @@ public void testCreateAndDropTable() throws IOException { Map props = ImmutableMap.of("k1", "v1", "k2", "v2"); Column[] columns = new Column[] { - TestColumn.builder().withName("col1").withType(Types.StringType.get()).build(), - TestColumn.builder().withName("col2").withType(Types.StringType.get()).build() + TestColumn.builder() + .withName("col1") + .withPosition(0) + .withType(Types.StringType.get()) + .build(), + TestColumn.builder() + .withName("col2") + .withPosition(1) + .withType(Types.StringType.get()) + .build() }; schemaOperationDispatcher.createSchema( @@ -343,8 +376,16 @@ public void testCreateTableNeedImportingSchema() throws IOException { NameIdentifier.of(tableNs.levels()), "", Collections.emptyMap()); Column[] columns = new Column[] { - TestColumn.builder().withName("col1").withType(Types.StringType.get()).build(), - TestColumn.builder().withName("col2").withType(Types.StringType.get()).build() + TestColumn.builder() + .withName("col1") + .withPosition(0) + .withType(Types.StringType.get()) + .build(), + TestColumn.builder() + .withName("col2") + .withPosition(1) + .withType(Types.StringType.get()) + .build() }; tableOperationDispatcher.createTable(tableIdent, columns, "comment", props); Assertions.assertTrue(entityStore.exists(NameIdentifier.of(tableNs.levels()), SCHEMA)); @@ -362,6 +403,7 @@ public void testCreateAndLoadTableWithColumn() throws IOException { new Column[] { TestColumn.builder() .withName("col1") + .withPosition(0) .withType(Types.StringType.get()) .withComment("comment1") .withNullable(true) @@ -370,6 +412,7 @@ public void testCreateAndLoadTableWithColumn() throws IOException { .build(), TestColumn.builder() .withName("col2") + .withPosition(1) .withType(Types.StringType.get()) .withComment("comment2") .withNullable(false) @@ -490,6 +533,7 @@ public void testCreateAndAlterTableWithColumn() throws IOException { new Column[] { TestColumn.builder() .withName("col1") + .withPosition(0) .withType(Types.StringType.get()) .withComment("comment1") .withNullable(true) @@ -498,6 +542,7 @@ public void testCreateAndAlterTableWithColumn() throws IOException { .build(), TestColumn.builder() .withName("col2") + .withPosition(1) .withType(Types.StringType.get()) .withComment("comment2") .withNullable(false) @@ -519,6 +564,7 @@ public void testCreateAndAlterTableWithColumn() throws IOException { new Column[] { TestColumn.builder() .withName("col3") + .withPosition(0) .withType(Types.StringType.get()) .withComment("comment1") .withNullable(true) @@ -527,6 +573,7 @@ public void testCreateAndAlterTableWithColumn() throws IOException { .build(), TestColumn.builder() .withName("col2") + .withPosition(1) .withType(Types.StringType.get()) .withComment("comment2") .withNullable(false) @@ -555,6 +602,7 @@ public void testCreateAndAlterTableWithColumn() throws IOException { new Column[] { TestColumn.builder() .withName("col4") + .withPosition(0) .withType(Types.StringType.get()) .withComment("comment4") .withNullable(true) @@ -563,6 +611,7 @@ public void testCreateAndAlterTableWithColumn() throws IOException { .build(), TestColumn.builder() .withName("col3") + .withPosition(1) .withType(Types.StringType.get()) .withComment("comment1") .withNullable(true) @@ -571,6 +620,7 @@ public void testCreateAndAlterTableWithColumn() throws IOException { .build(), TestColumn.builder() .withName("col2") + .withPosition(2) .withType(Types.StringType.get()) .withComment("comment2") .withNullable(false) @@ -594,6 +644,7 @@ public void testCreateAndAlterTableWithColumn() throws IOException { new Column[] { TestColumn.builder() .withName("col4") + .withPosition(0) .withType(Types.StringType.get()) .withComment("comment4") .withNullable(true) @@ -617,6 +668,7 @@ public void testCreateAndAlterTableWithColumn() throws IOException { new Column[] { TestColumn.builder() .withName("col4") + .withPosition(0) .withType(Types.StringType.get()) .withComment("comment4") .withNullable(true) @@ -639,6 +691,7 @@ public void testCreateAndAlterTableWithColumn() throws IOException { new Column[] { TestColumn.builder() .withName("col4") + .withPosition(0) .withType(Types.IntegerType.get()) .withComment("comment4") .withNullable(true) @@ -661,6 +714,7 @@ public void testCreateAndAlterTableWithColumn() throws IOException { new Column[] { TestColumn.builder() .withName("col4") + .withPosition(0) .withType(Types.IntegerType.get()) .withComment("new comment") .withNullable(true) @@ -683,6 +737,7 @@ public void testCreateAndAlterTableWithColumn() throws IOException { new Column[] { TestColumn.builder() .withName("col4") + .withPosition(0) .withType(Types.IntegerType.get()) .withComment("new comment") .withNullable(false) @@ -705,6 +760,7 @@ public void testCreateAndAlterTableWithColumn() throws IOException { new Column[] { TestColumn.builder() .withName("col4") + .withPosition(0) .withType(Types.IntegerType.get()) .withComment("new comment") .withNullable(false) @@ -730,6 +786,7 @@ public void testCreateAndDropTableWithColumn() throws IOException { new Column[] { TestColumn.builder() .withName("col1") + .withPosition(0) .withType(Types.StringType.get()) .withComment("comment1") .withNullable(true) @@ -738,6 +795,7 @@ public void testCreateAndDropTableWithColumn() throws IOException { .build(), TestColumn.builder() .withName("col2") + .withPosition(1) .withType(Types.StringType.get()) .withComment("comment2") .withNullable(false) @@ -772,14 +830,16 @@ private static void testColumns(Column[] expectedColumns, Column[] actualColumns Assertions.assertEquals(expectedColumnMap.size(), actualColumnMap.size()); expectedColumnMap.forEach( (name, expectedColumn) -> { - Column actualColumn = actualColumnMap.get(name); + TestColumn actualColumn = (TestColumn) actualColumnMap.get(name); + TestColumn e = (TestColumn) expectedColumn; Assertions.assertNotNull(actualColumn); - Assertions.assertEquals(expectedColumn.name().toLowerCase(), actualColumn.name()); - Assertions.assertEquals(expectedColumn.dataType(), actualColumn.dataType()); - Assertions.assertEquals(expectedColumn.comment(), actualColumn.comment()); - Assertions.assertEquals(expectedColumn.nullable(), actualColumn.nullable()); - Assertions.assertEquals(expectedColumn.autoIncrement(), actualColumn.autoIncrement()); - Assertions.assertEquals(expectedColumn.defaultValue(), actualColumn.defaultValue()); + Assertions.assertEquals(e.name().toLowerCase(), actualColumn.name()); + Assertions.assertEquals(e.position(), actualColumn.position()); + Assertions.assertEquals(e.dataType(), actualColumn.dataType()); + Assertions.assertEquals(e.comment(), actualColumn.comment()); + Assertions.assertEquals(e.nullable(), actualColumn.nullable()); + Assertions.assertEquals(e.autoIncrement(), actualColumn.autoIncrement()); + Assertions.assertEquals(e.defaultValue(), actualColumn.defaultValue()); }); } @@ -800,13 +860,15 @@ private static void testColumnAndColumnEntities( expectedColumnMap.forEach( (name, expectedColumn) -> { ColumnEntity actualColumn = actualColumnMap.get(name); + TestColumn e = (TestColumn) expectedColumn; Assertions.assertNotNull(actualColumn); - Assertions.assertEquals(expectedColumn.name(), actualColumn.name()); - Assertions.assertEquals(expectedColumn.dataType(), actualColumn.dataType()); - Assertions.assertEquals(expectedColumn.comment(), actualColumn.comment()); - Assertions.assertEquals(expectedColumn.nullable(), actualColumn.nullable()); - Assertions.assertEquals(expectedColumn.autoIncrement(), actualColumn.autoIncrement()); - Assertions.assertEquals(expectedColumn.defaultValue(), actualColumn.defaultValue()); + Assertions.assertEquals(e.name(), actualColumn.name()); + Assertions.assertEquals(e.position(), actualColumn.position()); + Assertions.assertEquals(e.dataType(), actualColumn.dataType()); + Assertions.assertEquals(e.comment(), actualColumn.comment()); + Assertions.assertEquals(e.nullable(), actualColumn.nullable()); + Assertions.assertEquals(e.autoIncrement(), actualColumn.autoIncrement()); + Assertions.assertEquals(e.defaultValue(), actualColumn.defaultValue()); }); } } diff --git a/core/src/test/java/org/apache/gravitino/connector/TestCatalogOperations.java b/core/src/test/java/org/apache/gravitino/connector/TestCatalogOperations.java index 13c4652058a..4fb98c596b8 100644 --- a/core/src/test/java/org/apache/gravitino/connector/TestCatalogOperations.java +++ b/core/src/test/java/org/apache/gravitino/connector/TestCatalogOperations.java @@ -24,10 +24,12 @@ import java.io.IOException; import java.time.Instant; import java.util.Arrays; +import java.util.Comparator; import java.util.HashMap; import java.util.Map; import java.util.function.Function; import java.util.stream.Collectors; +import java.util.stream.IntStream; import org.apache.commons.lang3.StringUtils; import org.apache.gravitino.Catalog; import org.apache.gravitino.NameIdentifier; @@ -136,13 +138,29 @@ public Table createTable( AuditInfo auditInfo = AuditInfo.builder().withCreator("test").withCreateTime(Instant.now()).build(); + TestColumn[] sortedColumns = + IntStream.range(0, columns.length) + .mapToObj( + i -> + TestColumn.builder() + .withName(columns[i].name()) + .withPosition(i) + .withComment(columns[i].comment()) + .withType(columns[i].dataType()) + .withNullable(columns[i].nullable()) + .withAutoIncrement(columns[i].autoIncrement()) + .withDefaultValue(columns[i].defaultValue()) + .build()) + .sorted(Comparator.comparingInt(TestColumn::position)) + .toArray(TestColumn[]::new); + TestTable table = TestTable.builder() .withName(ident.name()) .withComment(comment) .withProperties(new HashMap<>(properties)) .withAuditInfo(auditInfo) - .withColumns(columns) + .withColumns(sortedColumns) .withDistribution(distribution) .withSortOrders(sortOrders) .withPartitioning(partitions) @@ -160,7 +178,7 @@ public Table createTable( .withComment(comment) .withProperties(new HashMap<>(properties)) .withAuditInfo(auditInfo) - .withColumns(columns) + .withColumns(sortedColumns) .withDistribution(distribution) .withSortOrders(sortOrders) .withPartitioning(partitions) @@ -646,6 +664,39 @@ private boolean checkSingleFile(Fileset fileset) { } } + private Map updateColumnPositionsAfterColumnUpdate( + String updatedColumnName, + TableChange.ColumnPosition newColumnPosition, + Map allColumns) { + TestColumn updatedColumn = (TestColumn) allColumns.get(updatedColumnName); + int newPosition; + if (newColumnPosition instanceof TableChange.First) { + newPosition = 0; + } else if (newColumnPosition instanceof TableChange.Default) { + newPosition = allColumns.size() - 1; + } else if (newColumnPosition instanceof TableChange.After) { + String afterColumnName = ((TableChange.After) newColumnPosition).getColumn(); + Column afterColumn = allColumns.get(afterColumnName); + newPosition = ((TestColumn) afterColumn).position() + 1; + } else { + throw new IllegalArgumentException("Unsupported column position: " + newColumnPosition); + } + updatedColumn.setPosition(newPosition); + + allColumns.forEach( + (columnName, column) -> { + if (columnName.equals(updatedColumnName)) { + return; + } + TestColumn testColumn = (TestColumn) column; + if (testColumn.position() >= newPosition) { + testColumn.setPosition(testColumn.position() + 1); + } + }); + + return allColumns; + } + private Column[] updateColumns(Column[] columns, TableChange.ColumnChange[] columnChanges) { Map columnMap = Arrays.stream(columns).collect(Collectors.toMap(Column::name, Function.identity())); @@ -656,6 +707,7 @@ private Column[] updateColumns(Column[] columns, TableChange.ColumnChange[] colu TestColumn column = TestColumn.builder() .withName(String.join(".", addColumn.fieldName())) + .withPosition(columnMap.size()) .withComment(addColumn.getComment()) .withType(addColumn.getDataType()) .withNullable(addColumn.isNullable()) @@ -663,9 +715,18 @@ private Column[] updateColumns(Column[] columns, TableChange.ColumnChange[] colu .withDefaultValue(addColumn.getDefaultValue()) .build(); columnMap.put(column.name(), column); + updateColumnPositionsAfterColumnUpdate(column.name(), addColumn.getPosition(), columnMap); } else if (columnChange instanceof TableChange.DeleteColumn) { - columnMap.remove(String.join(".", columnChange.fieldName())); + TestColumn removedColumn = + (TestColumn) columnMap.remove(String.join(".", columnChange.fieldName())); + columnMap.forEach( + (columnName, column) -> { + TestColumn testColumn = (TestColumn) column; + if (testColumn.position() > removedColumn.position()) { + testColumn.setPosition(testColumn.position() - 1); + } + }); } else if (columnChange instanceof TableChange.RenameColumn) { String oldName = String.join(".", columnChange.fieldName()); @@ -674,6 +735,7 @@ private Column[] updateColumns(Column[] columns, TableChange.ColumnChange[] colu TestColumn newColumn = TestColumn.builder() .withName(newName) + .withPosition(((TestColumn) column).position()) .withComment(column.comment()) .withType(column.dataType()) .withNullable(column.nullable()) @@ -690,6 +752,7 @@ private Column[] updateColumns(Column[] columns, TableChange.ColumnChange[] colu TestColumn newColumn = TestColumn.builder() .withName(columnName) + .withPosition(((TestColumn) oldColumn).position()) .withComment(oldColumn.comment()) .withType(oldColumn.dataType()) .withNullable(oldColumn.nullable()) @@ -705,6 +768,7 @@ private Column[] updateColumns(Column[] columns, TableChange.ColumnChange[] colu TestColumn newColumn = TestColumn.builder() .withName(columnName) + .withPosition(((TestColumn) oldColumn).position()) .withComment(oldColumn.comment()) .withType(updateColumnType.getNewDataType()) .withNullable(oldColumn.nullable()) @@ -721,6 +785,7 @@ private Column[] updateColumns(Column[] columns, TableChange.ColumnChange[] colu TestColumn newColumn = TestColumn.builder() .withName(columnName) + .withPosition(((TestColumn) oldColumn).position()) .withComment(updateColumnComment.getNewComment()) .withType(oldColumn.dataType()) .withNullable(oldColumn.nullable()) @@ -737,6 +802,7 @@ private Column[] updateColumns(Column[] columns, TableChange.ColumnChange[] colu TestColumn newColumn = TestColumn.builder() .withName(columnName) + .withPosition(((TestColumn) oldColumn).position()) .withComment(oldColumn.comment()) .withType(oldColumn.dataType()) .withNullable(updateColumnNullable.nullable()) @@ -753,6 +819,7 @@ private Column[] updateColumns(Column[] columns, TableChange.ColumnChange[] colu TestColumn newColumn = TestColumn.builder() .withName(columnName) + .withPosition(((TestColumn) oldColumn).position()) .withComment(oldColumn.comment()) .withType(oldColumn.dataType()) .withNullable(oldColumn.nullable()) @@ -761,10 +828,22 @@ private Column[] updateColumns(Column[] columns, TableChange.ColumnChange[] colu .build(); columnMap.put(columnName, newColumn); + } else if (columnChange instanceof TableChange.UpdateColumnPosition) { + String columnName = String.join(".", columnChange.fieldName()); + TableChange.UpdateColumnPosition updateColumnPosition = + (TableChange.UpdateColumnPosition) columnChange; + columnMap = + updateColumnPositionsAfterColumnUpdate( + columnName, updateColumnPosition.getPosition(), columnMap); + } else { - // do nothing + throw new IllegalArgumentException("Unsupported column change: " + columnChange); } } - return columnMap.values().toArray(new Column[0]); + + return columnMap.values().stream() + .map(TestColumn.class::cast) + .sorted(Comparator.comparingInt(TestColumn::position)) + .toArray(TestColumn[]::new); } } diff --git a/core/src/test/java/org/apache/gravitino/meta/TestColumnEntity.java b/core/src/test/java/org/apache/gravitino/meta/TestColumnEntity.java index e7ad7e7d076..519bcfca362 100644 --- a/core/src/test/java/org/apache/gravitino/meta/TestColumnEntity.java +++ b/core/src/test/java/org/apache/gravitino/meta/TestColumnEntity.java @@ -36,6 +36,7 @@ public void testColumnEntityFields() { ColumnEntity.builder() .withId(1L) .withName("test") + .withPosition(1) .withComment("test comment") .withDataType(Types.IntegerType.get()) .withNullable(true) @@ -47,6 +48,7 @@ public void testColumnEntityFields() { Assertions.assertEquals(1L, columnEntity.id()); Assertions.assertEquals("test", columnEntity.name()); + Assertions.assertEquals(1, columnEntity.position()); Assertions.assertEquals("test comment", columnEntity.comment()); Assertions.assertEquals(Types.IntegerType.get(), columnEntity.dataType()); Assertions.assertTrue(columnEntity.nullable()); @@ -57,6 +59,7 @@ public void testColumnEntityFields() { ColumnEntity.builder() .withId(1L) .withName("test") + .withPosition(1) .withDataType(Types.IntegerType.get()) .withNullable(true) .withAutoIncrement(true) @@ -70,6 +73,7 @@ public void testColumnEntityFields() { ColumnEntity.builder() .withId(1L) .withName("test") + .withPosition(1) .withComment("test comment") .withDataType(Types.IntegerType.get()) .withNullable(true) @@ -87,6 +91,7 @@ public void testWithoutRequiredFields() { () -> { ColumnEntity.builder() .withId(1L) + .withPosition(1) .withName("test") .withNullable(true) .withAutoIncrement(true) @@ -101,6 +106,7 @@ public void testWithoutRequiredFields() { () -> { ColumnEntity.builder() .withId(1L) + .withPosition(1) .withComment("test comment") .withDataType(Types.IntegerType.get()) .withAutoIncrement(true) @@ -115,6 +121,7 @@ public void testWithoutRequiredFields() { () -> { ColumnEntity.builder() .withId(1L) + .withName("test") .withComment("test comment") .withDataType(Types.IntegerType.get()) .withNullable(true) @@ -143,6 +150,7 @@ public void testTableColumnEntity() { ColumnEntity.builder() .withId(1L) .withName("test") + .withPosition(1) .withComment("test comment") .withDataType(Types.IntegerType.get()) .withNullable(true) @@ -156,6 +164,7 @@ public void testTableColumnEntity() { ColumnEntity.builder() .withId(2L) .withName("test2") + .withPosition(2) .withComment("test comment2") .withDataType(Types.StringType.get()) .withNullable(true) @@ -169,6 +178,7 @@ public void testTableColumnEntity() { ColumnEntity.builder() .withId(3L) .withName("test3") + .withPosition(3) .withComment("test comment3") .withDataType(Types.BooleanType.get()) .withNullable(true) diff --git a/core/src/test/java/org/apache/gravitino/storage/relational/service/TestTableColumnMetaService.java b/core/src/test/java/org/apache/gravitino/storage/relational/service/TestTableColumnMetaService.java index 0b15d1e0f5c..8d61d357cc7 100644 --- a/core/src/test/java/org/apache/gravitino/storage/relational/service/TestTableColumnMetaService.java +++ b/core/src/test/java/org/apache/gravitino/storage/relational/service/TestTableColumnMetaService.java @@ -76,6 +76,7 @@ public void testInsertAndGetTableColumns() throws IOException { ColumnEntity.builder() .withId(RandomIdGenerator.INSTANCE.nextId()) .withName("column1") + .withPosition(0) .withComment("comment1") .withDataType(Types.IntegerType.get()) .withNullable(true) @@ -87,6 +88,7 @@ public void testInsertAndGetTableColumns() throws IOException { ColumnEntity.builder() .withId(RandomIdGenerator.INSTANCE.nextId()) .withName("column2") + .withPosition(1) .withComment("comment2") .withDataType(Types.StringType.get()) .withNullable(false) @@ -118,6 +120,7 @@ public void testInsertAndGetTableColumns() throws IOException { ColumnEntity.builder() .withId(RandomIdGenerator.INSTANCE.nextId()) .withName("column3") + .withPosition(0) .withComment("comment3") .withDataType(Types.IntegerType.get()) .withNullable(true) @@ -187,6 +190,7 @@ public void testUpdateTable() throws IOException { ColumnEntity.builder() .withId(RandomIdGenerator.INSTANCE.nextId()) .withName("column1") + .withPosition(0) .withComment("comment1") .withDataType(Types.IntegerType.get()) .withNullable(true) @@ -222,6 +226,7 @@ public void testUpdateTable() throws IOException { ColumnEntity.builder() .withId(RandomIdGenerator.INSTANCE.nextId()) .withName("column2") + .withPosition(1) .withComment("comment2") .withDataType(Types.StringType.get()) .withNullable(false) @@ -257,6 +262,7 @@ public void testUpdateTable() throws IOException { ColumnEntity.builder() .withId(column1.id()) .withName(column1.name()) + .withPosition(column1.position()) .withComment("comment1_updated") .withDataType(Types.LongType.get()) .withNullable(column1.nullable()) @@ -331,6 +337,7 @@ public void testCreateAndDeleteTable() throws IOException { ColumnEntity.builder() .withId(RandomIdGenerator.INSTANCE.nextId()) .withName("column1") + .withPosition(0) .withComment("comment1") .withDataType(Types.IntegerType.get()) .withNullable(true) @@ -377,6 +384,7 @@ public void testDeleteMetalake() throws IOException { ColumnEntity.builder() .withId(RandomIdGenerator.INSTANCE.nextId()) .withName("column1") + .withPosition(0) .withComment("comment1") .withDataType(Types.IntegerType.get()) .withNullable(true) @@ -423,6 +431,7 @@ private void compareTwoColumns( Assertions.assertNotNull(expectedColumn); Assertions.assertEquals(expectedColumn.id(), column.id()); Assertions.assertEquals(expectedColumn.name(), column.name()); + Assertions.assertEquals(expectedColumn.position(), column.position()); Assertions.assertEquals(expectedColumn.comment(), column.comment()); Assertions.assertEquals(expectedColumn.dataType(), column.dataType()); Assertions.assertEquals(expectedColumn.nullable(), column.nullable()); diff --git a/core/src/test/java/org/apache/gravitino/storage/relational/utils/TestPOConverters.java b/core/src/test/java/org/apache/gravitino/storage/relational/utils/TestPOConverters.java index e97e650a144..76a2c35d317 100644 --- a/core/src/test/java/org/apache/gravitino/storage/relational/utils/TestPOConverters.java +++ b/core/src/test/java/org/apache/gravitino/storage/relational/utils/TestPOConverters.java @@ -160,6 +160,7 @@ public void testFromColumnPO() throws JsonProcessingException { createColumnPO( 1L, "test", + 0, 1L, 1L, 1L, @@ -173,7 +174,7 @@ public void testFromColumnPO() throws JsonProcessingException { ColumnEntity expectedColumn = createColumn( - 1L, "test", Types.IntegerType.get(), "test", true, true, Literals.integerLiteral(1)); + 1L, "test", 0, Types.IntegerType.get(), "test", true, true, Literals.integerLiteral(1)); ColumnEntity convertedColumn = POConverters.fromColumnPO(columnPO); assertEquals(expectedColumn.id(), convertedColumn.id()); @@ -189,6 +190,7 @@ public void testFromColumnPO() throws JsonProcessingException { createColumnPO( 1L, "test", + 0, 1L, 1L, 1L, @@ -202,7 +204,7 @@ public void testFromColumnPO() throws JsonProcessingException { ColumnEntity expectedColumn1 = createColumn( - 1L, "test", Types.IntegerType.get(), null, true, true, Literals.integerLiteral(1)); + 1L, "test", 0, Types.IntegerType.get(), null, true, true, Literals.integerLiteral(1)); ColumnEntity convertedColumn1 = POConverters.fromColumnPO(columnPO1); assertEquals(expectedColumn1.comment(), convertedColumn1.comment()); @@ -215,6 +217,7 @@ public void testFromTableColumnPOs() throws JsonProcessingException { createColumnPO( 1L, "test1", + 0, 1L, 1L, 1L, @@ -230,6 +233,7 @@ public void testFromTableColumnPOs() throws JsonProcessingException { createColumnPO( 2L, "test2", + 1, 1L, 1L, 1L, @@ -243,11 +247,25 @@ public void testFromTableColumnPOs() throws JsonProcessingException { ColumnEntity expectedColumn1 = createColumn( - 1L, "test1", Types.IntegerType.get(), "test1", true, true, Literals.integerLiteral(1)); + 1L, + "test1", + 0, + Types.IntegerType.get(), + "test1", + true, + true, + Literals.integerLiteral(1)); ColumnEntity expectedColumn2 = createColumn( - 2L, "test2", Types.StringType.get(), "test2", true, true, Literals.stringLiteral("1")); + 2L, + "test2", + 1, + Types.StringType.get(), + "test2", + true, + true, + Literals.stringLiteral("1")); TableEntity expectedTable = createTableWithColumns( @@ -962,6 +980,7 @@ private static TablePO createTablePO( private static ColumnPO createColumnPO( Long id, String columnName, + Integer columnPosition, Long metalakeId, Long catalogId, Long schemaId, @@ -978,6 +997,7 @@ private static ColumnPO createColumnPO( return ColumnPO.builder() .withColumnId(id) .withColumnName(columnName) + .withColumnPosition(columnPosition) .withMetalakeId(metalakeId) .withCatalogId(catalogId) .withSchemaId(schemaId) @@ -999,6 +1019,7 @@ private static ColumnPO createColumnPO( private static ColumnEntity createColumn( Long id, String columnName, + Integer columnPosition, Type columnType, String columnComment, boolean columnNullable, @@ -1009,6 +1030,7 @@ private static ColumnEntity createColumn( return ColumnEntity.builder() .withId(id) .withName(columnName) + .withPosition(columnPosition) .withDataType(columnType) .withComment(columnComment) .withNullable(columnNullable) diff --git a/scripts/h2/schema-0.7.0-h2.sql b/scripts/h2/schema-0.7.0-h2.sql index bada37abc32..26853135120 100644 --- a/scripts/h2/schema-0.7.0-h2.sql +++ b/scripts/h2/schema-0.7.0-h2.sql @@ -93,6 +93,7 @@ CREATE TABLE IF NOT EXISTS `table_column_version_info` ( `table_version` INT UNSIGNED NOT NULL COMMENT 'table version', `column_id` BIGINT(20) UNSIGNED NOT NULL COMMENT 'column id', `column_name` VARCHAR(128) NOT NULL COMMENT 'column name', + `column_position` INT UNSIGNED NOT NULL COMMENT 'column position', `column_type` TEXT NOT NULL COMMENT 'column type', `column_comment` VARCHAR(256) DEFAULT '' COMMENT 'column comment', `column_nullable` TINYINT(1) NOT NULL DEFAULT 1 COMMENT 'column nullable, 0 is not nullable, 1 is nullable', diff --git a/scripts/h2/upgrade-0.6.0-to-0.7.0-h2.sql b/scripts/h2/upgrade-0.6.0-to-0.7.0-h2.sql index cdf1bbdc432..657cd266955 100644 --- a/scripts/h2/upgrade-0.6.0-to-0.7.0-h2.sql +++ b/scripts/h2/upgrade-0.6.0-to-0.7.0-h2.sql @@ -25,6 +25,7 @@ CREATE TABLE IF NOT EXISTS `table_column_version_info` ( `table_version` INT UNSIGNED NOT NULL COMMENT 'table version', `column_id` BIGINT(20) UNSIGNED NOT NULL COMMENT 'column id', `column_name` VARCHAR(128) NOT NULL COMMENT 'column name', + `column_position` INT UNSIGNED NOT NULL COMMENT 'column position', `column_type` TEXT NOT NULL COMMENT 'column type', `column_comment` VARCHAR(256) DEFAULT '' COMMENT 'column comment', `column_nullable` TINYINT(1) NOT NULL DEFAULT 1 COMMENT 'column nullable, 0 is not nullable, 1 is nullable', diff --git a/scripts/mysql/schema-0.7.0-mysql.sql b/scripts/mysql/schema-0.7.0-mysql.sql index 13f46debc0d..0e43dff37c5 100644 --- a/scripts/mysql/schema-0.7.0-mysql.sql +++ b/scripts/mysql/schema-0.7.0-mysql.sql @@ -88,6 +88,7 @@ CREATE TABLE IF NOT EXISTS `table_column_version_info` ( `table_version` INT UNSIGNED NOT NULL COMMENT 'table version', `column_id` BIGINT(20) UNSIGNED NOT NULL COMMENT 'column id', `column_name` VARCHAR(128) NOT NULL COMMENT 'column name', + `column_position` INT UNSIGNED NOT NULL COMMENT 'column position', `column_type` TEXT NOT NULL COMMENT 'column type', `column_comment` VARCHAR(256) DEFAULT '' COMMENT 'column comment', `column_nullable` TINYINT(1) NOT NULL DEFAULT 1 COMMENT 'column nullable, 0 is not nullable, 1 is nullable', diff --git a/scripts/mysql/upgrade-0.6.0-to-0.7.0-mysql.sql b/scripts/mysql/upgrade-0.6.0-to-0.7.0-mysql.sql index 0afe5607841..9d6152d461a 100644 --- a/scripts/mysql/upgrade-0.6.0-to-0.7.0-mysql.sql +++ b/scripts/mysql/upgrade-0.6.0-to-0.7.0-mysql.sql @@ -25,6 +25,7 @@ CREATE TABLE IF NOT EXISTS `table_column_version_info` ( `table_version` INT UNSIGNED NOT NULL COMMENT 'table version', `column_id` BIGINT(20) UNSIGNED NOT NULL COMMENT 'column id', `column_name` VARCHAR(128) NOT NULL COMMENT 'column name', + `column_position` INT UNSIGNED NOT NULL COMMENT 'column position', `column_type` TEXT NOT NULL COMMENT 'column type', `column_comment` VARCHAR(256) DEFAULT '' COMMENT 'column comment', `column_nullable` TINYINT(1) NOT NULL DEFAULT 1 COMMENT 'column nullable, 0 is not nullable, 1 is nullable', diff --git a/scripts/postgresql/schema-0.7.0-postgresql.sql b/scripts/postgresql/schema-0.7.0-postgresql.sql index d377c57b556..f407b3f1921 100644 --- a/scripts/postgresql/schema-0.7.0-postgresql.sql +++ b/scripts/postgresql/schema-0.7.0-postgresql.sql @@ -149,6 +149,7 @@ CREATE TABLE IF NOT EXISTS table_column_version_info ( table_version INT NOT NULL, column_id BIGINT NOT NULL, column_name VARCHAR(128) NOT NULL, + column_position INT NOT NULL, column_type TEXT NOT NULL, column_comment VARCHAR(256) DEFAULT '', column_nullable SMALLINT NOT NULL DEFAULT 1,