Skip to content

Commit

Permalink
[apache#2447] feat(spark-connector): support alter table AddColumn an…
Browse files Browse the repository at this point in the history
…d DropColumn for spark-connector
  • Loading branch information
caican00 committed Mar 7, 2024
1 parent a08f396 commit b288049
Show file tree
Hide file tree
Showing 3 changed files with 28 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -286,20 +286,20 @@ void testAlterTableAddAndDeleteColumn() {
createSimpleTable(tableName);
List<SparkColumnInfo> sparkOldColumnInfos = getTableInfo(tableName).getColumns();
sparkOldColumnInfos.forEach(
sparkColumnInfo ->
Assertions.assertFalse("col1".equalsIgnoreCase(sparkColumnInfo.getName())));
sparkColumnInfo ->
Assertions.assertFalse("col1".equalsIgnoreCase(sparkColumnInfo.getName())));

sql(String.format("ALTER TABLE %S ADD COLUMNS (col1 string)", tableName));
List<SparkColumnInfo> sparkAddColumnInfos = getTableInfo(tableName).getColumns();
Assertions.assertTrue(
sparkAddColumnInfos.stream()
.anyMatch(sparkColumnInfo -> "col1".equalsIgnoreCase(sparkColumnInfo.getName())));
sparkAddColumnInfos.stream()
.anyMatch(sparkColumnInfo -> "col1".equalsIgnoreCase(sparkColumnInfo.getName())));

sql(String.format("ALTER TABLE %S DROP COLUMNS (col1)", tableName));
List<SparkColumnInfo> sparkDeleteColumnInfos = getTableInfo(tableName).getColumns();
sparkDeleteColumnInfos.forEach(
sparkColumnInfo ->
Assertions.assertFalse("col1".equalsIgnoreCase(sparkColumnInfo.getName())));
sparkColumnInfo ->
Assertions.assertFalse("col1".equalsIgnoreCase(sparkColumnInfo.getName())));
}

private void checkTableReadWrite(SparkTableInfo table) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -364,23 +364,23 @@ static com.datastrato.gravitino.rel.TableChange transformTableChange(TableChange
} else if (change instanceof TableChange.AddColumn) {
TableChange.AddColumn addColumn = (TableChange.AddColumn) change;
return com.datastrato.gravitino.rel.TableChange.addColumn(
addColumn.fieldNames(),
SparkTypeConverter.toGravitinoType(addColumn.dataType()),
addColumn.comment(),
transformColumnPosition(addColumn.position()),
addColumn.isNullable());
addColumn.fieldNames(),
SparkTypeConverter.toGravitinoType(addColumn.dataType()),
addColumn.comment(),
transformColumnPosition(addColumn.position()),
addColumn.isNullable());
} else if (change instanceof TableChange.DeleteColumn) {
TableChange.DeleteColumn deleteColumn = (TableChange.DeleteColumn) change;
return com.datastrato.gravitino.rel.TableChange.deleteColumn(
deleteColumn.fieldNames(), deleteColumn.ifExists());
deleteColumn.fieldNames(), deleteColumn.ifExists());
} else {
throw new UnsupportedOperationException(
String.format("Unsupported table change %s", change.getClass().getName()));
String.format("Unsupported table change %s", change.getClass().getName()));
}
}

private static com.datastrato.gravitino.rel.TableChange.ColumnPosition transformColumnPosition(
TableChange.ColumnPosition columnPosition) {
TableChange.ColumnPosition columnPosition) {
if (columnPosition instanceof TableChange.First) {
return com.datastrato.gravitino.rel.TableChange.ColumnPosition.first();
} else if (columnPosition instanceof TableChange.After) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
package com.datastrato.gravitino.spark.connector.catalog;

import java.util.Arrays;
import java.util.Locale;
import org.apache.spark.sql.connector.catalog.TableChange;
import org.apache.spark.sql.types.DataTypes;
import org.junit.jupiter.api.Assertions;
Expand Down Expand Up @@ -42,39 +43,41 @@ void testTransformRemoveProperty() {
void testTransformAddColumn() {
TableChange sparkChange = TableChange.addColumn(new String[] {"col1"}, DataTypes.StringType);
com.datastrato.gravitino.rel.TableChange gravitinoChange =
GravitinoCatalog.transformTableChange(sparkChange);
GravitinoCatalog.transformTableChange(sparkChange);

TableChange.AddColumn sparkAddColumn = (TableChange.AddColumn) sparkChange;
Assertions.assertTrue(
gravitinoChange instanceof com.datastrato.gravitino.rel.TableChange.AddColumn);
gravitinoChange instanceof com.datastrato.gravitino.rel.TableChange.AddColumn);
com.datastrato.gravitino.rel.TableChange.AddColumn gravitinoAddColumn =
(com.datastrato.gravitino.rel.TableChange.AddColumn) gravitinoChange;
(com.datastrato.gravitino.rel.TableChange.AddColumn) gravitinoChange;

Assertions.assertEquals(1, sparkAddColumn.fieldNames().length);
Assertions.assertEquals(1, gravitinoAddColumn.fieldName().length);
Assertions.assertEquals(
Arrays.stream(sparkAddColumn.fieldNames()).findFirst(),
Arrays.stream(gravitinoAddColumn.fieldName()).findFirst());
Assertions.assertEquals(sparkAddColumn.dataType(), gravitinoAddColumn.getDataType());
Arrays.stream(sparkAddColumn.fieldNames()).findFirst(),
Arrays.stream(gravitinoAddColumn.fieldName()).findFirst());
Assertions.assertEquals(
sparkAddColumn.dataType().typeName().toLowerCase(Locale.ROOT),
gravitinoAddColumn.getDataType().simpleString().toLowerCase(Locale.ROOT));
}

@Test
void testTransformDeleteColumn() {
TableChange sparkChange = TableChange.deleteColumn(new String[] {"col1"}, true);
com.datastrato.gravitino.rel.TableChange gravitinoChange =
GravitinoCatalog.transformTableChange(sparkChange);
GravitinoCatalog.transformTableChange(sparkChange);

TableChange.DeleteColumn sparkDeleteColumn = (TableChange.DeleteColumn) sparkChange;
Assertions.assertTrue(
gravitinoChange instanceof com.datastrato.gravitino.rel.TableChange.DeleteColumn);
gravitinoChange instanceof com.datastrato.gravitino.rel.TableChange.DeleteColumn);
com.datastrato.gravitino.rel.TableChange.DeleteColumn gravitinoDeleteColumn =
(com.datastrato.gravitino.rel.TableChange.DeleteColumn) gravitinoChange;
(com.datastrato.gravitino.rel.TableChange.DeleteColumn) gravitinoChange;

Assertions.assertEquals(1, sparkDeleteColumn.fieldNames().length);
Assertions.assertEquals(1, gravitinoDeleteColumn.fieldName().length);
Assertions.assertEquals(
Arrays.stream(sparkDeleteColumn.fieldNames()).findFirst(),
Arrays.stream(gravitinoDeleteColumn.fieldName()).findFirst());
Arrays.stream(sparkDeleteColumn.fieldNames()).findFirst(),
Arrays.stream(gravitinoDeleteColumn.fieldName()).findFirst());
Assertions.assertEquals(sparkDeleteColumn.ifExists(), gravitinoDeleteColumn.getIfExists());
}
}

0 comments on commit b288049

Please sign in to comment.