Skip to content

Commit

Permalink
Add position support for column entity
Browse files Browse the repository at this point in the history
  • Loading branch information
jerryshao committed Oct 17, 2024
1 parent 075a851 commit cf32e54
Show file tree
Hide file tree
Showing 18 changed files with 320 additions and 56 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import java.util.Map;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.gravitino.EntityStore;
import org.apache.gravitino.GravitinoEnv;
Expand Down Expand Up @@ -499,8 +500,8 @@ private Table internalCreateTable(
.withCreateTime(Instant.now())
.build();
List<ColumnEntity> columnEntityList =
Arrays.stream(columns)
.map(c -> ColumnEntity.toColumnEntity(c, idGenerator.nextId(), audit))
IntStream.range(0, columns.length)
.mapToObj(i -> ColumnEntity.toColumnEntity(columns[i], i, idGenerator.nextId(), audit))
.collect(Collectors.toList());

TableEntity tableEntity =
Expand Down Expand Up @@ -531,13 +532,14 @@ private Table internalCreateTable(
private List<ColumnEntity> toColumnEntities(Column[] columns, AuditInfo audit) {
return columns == null
? Collections.emptyList()
: Arrays.stream(columns)
.map(c -> ColumnEntity.toColumnEntity(c, idGenerator.nextId(), audit))
: IntStream.range(0, columns.length)
.mapToObj(i -> ColumnEntity.toColumnEntity(columns[i], i, idGenerator.nextId(), audit))
.collect(Collectors.toList());
}

private boolean isSameColumn(Column left, ColumnEntity right) {
private boolean isSameColumn(Column left, int columnPosition, ColumnEntity right) {
return Objects.equal(left.name(), right.name())
&& columnPosition == right.position()
&& Objects.equal(left.dataType(), right.dataType())
&& Objects.equal(left.comment(), right.comment())
&& left.nullable() == right.nullable()
Expand All @@ -554,11 +556,12 @@ private Pair<Boolean, List<ColumnEntity>> updateColumnsIfNecessary(
return Pair.of(false, Collections.emptyList());
}

Map<String, Column> columnsFromCatalogTable =
Map<String, Pair<Integer, Column>> columnsFromCatalogTable =
tableFromCatalog.columns() == null
? Collections.emptyMap()
: Arrays.stream(tableFromCatalog.columns())
.collect(Collectors.toMap(Column::name, Function.identity()));
: IntStream.range(0, tableFromCatalog.columns().length)
.mapToObj(i -> Pair.of(i, tableFromCatalog.columns()[i]))
.collect(Collectors.toMap(p -> p.getRight().name(), Function.identity()));
Map<String, ColumnEntity> columnsFromTableEntity =
tableFromGravitino.columns() == null
? Collections.emptyMap()
Expand All @@ -569,25 +572,27 @@ private Pair<Boolean, List<ColumnEntity>> updateColumnsIfNecessary(
List<ColumnEntity> columnsToInsert = Lists.newArrayList();
boolean columnsNeedsUpdate = false;
for (Map.Entry<String, ColumnEntity> entry : columnsFromTableEntity.entrySet()) {
Column column = columnsFromCatalogTable.get(entry.getKey());
if (column == null) {
Pair<Integer, Column> columnPair = columnsFromCatalogTable.get(entry.getKey());
if (columnPair == null) {
LOG.debug(
"Column {} is not found in the table from underlying source, it will be removed"
+ " from the table entity",
entry.getKey());
columnsNeedsUpdate = true;

} else if (!isSameColumn(column, entry.getValue())) {
} else if (!isSameColumn(columnPair.getRight(), columnPair.getLeft(), entry.getValue())) {
// If the column need to be updated, we create a new ColumnEntity with the same id
LOG.debug(
"Column {} is found in the table from underlying source, but it is different "
+ "from the one in the table entity, it will be updated",
entry.getKey());

Column column = columnPair.getRight();
ColumnEntity updatedColumnEntity =
ColumnEntity.builder()
.withId(entry.getValue().id())
.withName(column.name())
.withPosition(columnPair.getLeft())
.withDataType(column.dataType())
.withComment(column.comment())
.withNullable(column.nullable())
Expand All @@ -612,15 +617,16 @@ private Pair<Boolean, List<ColumnEntity>> updateColumnsIfNecessary(
}

// Check if there are new columns in the table from the underlying source
for (Map.Entry<String, Column> entry : columnsFromCatalogTable.entrySet()) {
for (Map.Entry<String, Pair<Integer, Column>> entry : columnsFromCatalogTable.entrySet()) {
if (!columnsFromTableEntity.containsKey(entry.getKey())) {
LOG.debug(
"Column {} is found in the table from underlying source but not in the table "
+ "entity, it will be added to the table entity",
entry.getKey());
ColumnEntity newColumnEntity =
ColumnEntity.toColumnEntity(
entry.getValue(),
entry.getValue().getRight(),
entry.getValue().getLeft(),
idGenerator.nextId(),
AuditInfo.builder()
.withCreator(PrincipalUtils.getCurrentPrincipal().getName())
Expand Down
21 changes: 19 additions & 2 deletions core/src/main/java/org/apache/gravitino/meta/ColumnEntity.java
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ public class ColumnEntity implements Entity, Auditable {

public static final Field ID = Field.required("id", Long.class, "The column's unique identifier");
public static final Field NAME = Field.required("name", String.class, "The column's name");
public static final Field POSITION =
Field.required("position", Integer.class, "The column's position");
public static final Field TYPE = Field.required("dataType", Type.class, "The column's data type");
public static final Field COMMENT =
Field.optional("comment", String.class, "The column's comment");
Expand All @@ -57,6 +59,8 @@ public class ColumnEntity implements Entity, Auditable {

private String name;

private Integer position;

private Type dataType;

private String comment;
Expand All @@ -76,6 +80,7 @@ public Map<Field, Object> fields() {
Map<Field, Object> fields = Maps.newHashMap();
fields.put(ID, id);
fields.put(NAME, name);
fields.put(POSITION, position);
fields.put(TYPE, dataType);
fields.put(COMMENT, comment);
fields.put(NULLABLE, nullable);
Expand Down Expand Up @@ -104,6 +109,10 @@ public String name() {
return name;
}

public Integer position() {
return position;
}

public Type dataType() {
return dataType;
}
Expand Down Expand Up @@ -132,6 +141,7 @@ public boolean equals(Object o) {
ColumnEntity that = (ColumnEntity) o;
return Objects.equal(id, that.id)
&& Objects.equal(name, that.name)
&& Objects.equal(position, that.position)
&& Objects.equal(dataType, that.dataType)
&& Objects.equal(comment, that.comment)
&& Objects.equal(nullable, that.nullable)
Expand All @@ -143,17 +153,19 @@ public boolean equals(Object o) {
@Override
public int hashCode() {
return Objects.hashCode(
id, name, dataType, comment, nullable, autoIncrement, defaultValue, auditInfo);
id, name, position, dataType, comment, nullable, autoIncrement, defaultValue, auditInfo);
}

public static Builder builder() {
return new Builder();
}

public static ColumnEntity toColumnEntity(Column column, long uid, AuditInfo audit) {
public static ColumnEntity toColumnEntity(
Column column, int position, long uid, AuditInfo audit) {
return builder()
.withId(uid)
.withName(column.name())
.withPosition(position)
.withComment(column.comment())
.withDataType(column.dataType())
.withNullable(column.nullable())
Expand All @@ -180,6 +192,11 @@ public Builder withName(String name) {
return this;
}

public Builder withPosition(Integer position) {
columnEntity.position = position;
return this;
}

public Builder withDataType(Type dataType) {
columnEntity.dataType = dataType;
return this;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ public class TableColumnBaseSQLProvider {
public String listColumnPOsByTableIdAndVersion(
@Param("tableId") Long tableId, @Param("tableVersion") Long tableVersion) {
return "SELECT t1.column_id AS columnId, t1.column_name AS columnName,"
+ " t1.column_position AS columnPosition,"
+ " t1.metalake_id AS metalakeId, t1.catalog_id AS catalogId,"
+ " t1.schema_id AS schemaId, t1.table_id AS tableId,"
+ " t1.table_version AS tableVersion, t1.column_type AS columnType,"
Expand All @@ -50,15 +51,16 @@ public String insertColumnPOs(@Param("columnPOs") List<ColumnPO> columnPOs) {
return "<script>"
+ "INSERT INTO "
+ TableColumnMapper.COLUMN_TABLE_NAME
+ "(column_id, column_name, metalake_id, catalog_id, schema_id, table_id, table_version,"
+ " column_type, column_comment, column_nullable, column_auto_increment, "
+ "(column_id, column_name, column_position, metalake_id, catalog_id, schema_id,"
+ " table_id, table_version,"
+ " column_type, column_comment, column_nullable, column_auto_increment,"
+ " column_default_value, column_op_type, deleted_at, audit_info)"
+ " VALUES "
+ "<foreach collection='columnPOs' item='item' separator=','>"
+ "(#{item.columnId}, #{item.columnName}, #{item.metalakeId}, #{item.catalogId},"
+ " #{item.schemaId}, #{item.tableId}, #{item.tableVersion}, #{item.columnType}, "
+ " #{item.columnComment}, #{item.nullable}, #{item.autoIncrement}, #{item.defaultValue},"
+ " #{item.columnOpType}, #{item.deletedAt}, #{item.auditInfo})"
+ "(#{item.columnId}, #{item.columnName}, #{item.columnPosition}, #{item.metalakeId},"
+ " #{item.catalogId}, #{item.schemaId}, #{item.tableId}, #{item.tableVersion},"
+ " #{item.columnType}, #{item.columnComment}, #{item.nullable}, #{item.autoIncrement},"
+ " #{item.defaultValue}, #{item.columnOpType}, #{item.deletedAt}, #{item.auditInfo})"
+ "</foreach>"
+ "</script>";
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,8 @@ public static AutoIncrement fromBoolean(boolean autoIncrement) {

private String columnName;

private Integer columnPosition;

private Long metalakeId;

private Long catalogId;
Expand Down Expand Up @@ -177,6 +179,11 @@ public Builder withColumnName(String columnName) {
return this;
}

public Builder withColumnPosition(Integer columnPosition) {
columnPO.columnPosition = columnPosition;
return this;
}

public Builder withMetalakeId(Long metalakeId) {
columnPO.metalakeId = metalakeId;
return this;
Expand Down Expand Up @@ -247,6 +254,7 @@ public ColumnPO build() {
Preconditions.checkArgument(
StringUtils.isNotBlank(columnPO.columnName),
"Column name is required and cannot be blank");
Preconditions.checkArgument(columnPO.columnPosition != null, "Column position is required");
Preconditions.checkArgument(columnPO.metalakeId != null, "Metalake id is required");
Preconditions.checkArgument(columnPO.catalogId != null, "Catalog id is required");
Preconditions.checkArgument(columnPO.schemaId != null, "Schema id is required");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,12 +110,14 @@ void updateColumnPOsFromTableDiff(
List<ColumnPO> columnPOsToInsert = Lists.newArrayList();
for (ColumnEntity newColumn : newColumns.values()) {
ColumnEntity oldColumn = oldColumns.get(newColumn.id());
// If the column is not existed in old columns, or if the column is updated, mark it as UPDATE
if (oldColumn == null || !oldColumn.equals(newColumn)) {
columnPOsToInsert.add(
POConverters.initializeColumnPO(newTablePO, newColumn, ColumnPO.ColumnOpType.UPDATE));
}
}

// Mark the columns to DELETE if they are not existed in new columns.
for (ColumnEntity oldColumn : oldColumns.values()) {
if (!newColumns.containsKey(oldColumn.id())) {
columnPOsToInsert.add(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -452,6 +452,7 @@ public static ColumnEntity fromColumnPO(ColumnPO columnPO) {
return ColumnEntity.builder()
.withId(columnPO.getColumnId())
.withName(columnPO.getColumnName())
.withPosition(columnPO.getColumnPosition())
.withDataType(JsonUtils.anyFieldMapper().readValue(columnPO.getColumnType(), Type.class))
.withComment(columnPO.getColumnComment())
.withAutoIncrement(
Expand Down Expand Up @@ -482,6 +483,7 @@ public static ColumnPO initializeColumnPO(
return ColumnPO.builder()
.withColumnId(columnEntity.id())
.withColumnName(columnEntity.name())
.withColumnPosition(columnEntity.position())
.withMetalakeId(tablePO.getMetalakeId())
.withCatalogId(tablePO.getCatalogId())
.withSchemaId(tablePO.getSchemaId())
Expand Down
23 changes: 23 additions & 0 deletions core/src/test/java/org/apache/gravitino/TestColumn.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,17 +26,40 @@
@ToString
public class TestColumn extends BaseColumn {

private int position;

private TestColumn() {}

public int position() {
return position;
}

public void setPosition(int position) {
this.position = position;
}

public static class Builder extends BaseColumn.BaseColumnBuilder<Builder, TestColumn> {

private Integer position;

/** Creates a new instance of {@link Builder}. */
private Builder() {}

public Builder withPosition(int position) {
this.position = position;
return this;
}

@Override
protected TestColumn internalBuild() {
TestColumn column = new TestColumn();

if (position == null) {
throw new IllegalArgumentException("Position is required");
}

column.name = name;
column.position = position;
column.comment = comment;
column.dataType = dataType;
column.nullable = nullable;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,16 @@ public void testNameCaseInsensitive() {
NameIdentifier tableIdent = NameIdentifier.of(tableNs, "tableNAME");
Column[] columns =
new Column[] {
TestColumn.builder().withName("colNAME1").withType(Types.StringType.get()).build(),
TestColumn.builder().withName("colNAME2").withType(Types.StringType.get()).build()
TestColumn.builder()
.withName("colNAME1")
.withPosition(0)
.withType(Types.StringType.get())
.build(),
TestColumn.builder()
.withName("colNAME2")
.withPosition(1)
.withType(Types.StringType.get())
.build()
};
RangePartition assignedPartition =
Partitions.range(
Expand Down Expand Up @@ -143,8 +151,16 @@ public void testNameSpec() {
NameIdentifier.of(tableNs, MetadataObjects.METADATA_OBJECT_RESERVED_NAME);
Column[] columns =
new Column[] {
TestColumn.builder().withName("colNAME1").withType(Types.StringType.get()).build(),
TestColumn.builder().withName("colNAME2").withType(Types.StringType.get()).build()
TestColumn.builder()
.withName("colNAME1")
.withPosition(0)
.withType(Types.StringType.get())
.build(),
TestColumn.builder()
.withName("colNAME2")
.withPosition(1)
.withType(Types.StringType.get())
.build()
};
Exception exception =
Assertions.assertThrows(
Expand All @@ -166,6 +182,7 @@ public void testNameSpec() {
new Column[] {
TestColumn.builder()
.withName(MetadataObjects.METADATA_OBJECT_RESERVED_NAME)
.withPosition(0)
.withType(Types.StringType.get())
.build()
};
Expand Down
Loading

0 comments on commit cf32e54

Please sign in to comment.