Skip to content

Commit

Permalink
[#2447] feat(spark-connector): support alter table AddColumn and Drop…
Browse files Browse the repository at this point in the history
…Column for spark-connector (#2458)

### What changes were proposed in this pull request?

support alterTable addColumn and dropColumn for spark-connector in
AlterTableCommand.


### Why are the changes needed?

Implement addColumn and dropColumn ops for Spark AlterTableCommand.

Fix: [#2447](#2447)

### Does this PR introduce _any_ user-facing change?

Yes, users can run the following commands to modify the table columns
using spark sql.

```
ALTER TABLE StudentInfo ADD columns (col1 string);

ALTER TABLE StudentInfo DROP columns (col1);
```

### How was this patch tested?

New uts.
  • Loading branch information
caican00 authored Mar 10, 2024
1 parent 1bc35a3 commit 40c327b
Show file tree
Hide file tree
Showing 3 changed files with 144 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import com.datastrato.gravitino.integration.test.util.spark.SparkTableInfo.SparkColumnInfo;
import com.datastrato.gravitino.integration.test.util.spark.SparkTableInfoChecker;
import com.google.common.collect.ImmutableMap;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -278,6 +279,34 @@ void testAlterTableSetAndRemoveProperty() {
Assertions.assertTrue(newProperties.containsKey("key2"));
}

@Test
void testAlterTableAddAndDeleteColumn() {
String tableName = "test_column";
dropTableIfExists(tableName);

List<SparkColumnInfo> simpleTableColumns = getSimpleTableColumn();

createSimpleTable(tableName);
checkTableColumns(tableName, simpleTableColumns, getTableInfo(tableName));

sql(String.format("ALTER TABLE %S ADD COLUMNS (col1 string)", tableName));
ArrayList<SparkColumnInfo> addColumns = new ArrayList<>(simpleTableColumns);
addColumns.add(SparkColumnInfo.of("col1", DataTypes.StringType, null));
checkTableColumns(tableName, addColumns, getTableInfo(tableName));

sql(String.format("ALTER TABLE %S DROP COLUMNS (col1)", tableName));
checkTableColumns(tableName, simpleTableColumns, getTableInfo(tableName));
}

private void checkTableColumns(
String tableName, List<SparkColumnInfo> columnInfos, SparkTableInfo tableInfo) {
SparkTableInfoChecker.create()
.withName(tableName)
.withColumns(columnInfos)
.withComment(null)
.check(tableInfo);
}

private void checkTableReadWrite(SparkTableInfo table) {
String name = table.getTableIdentifier();
String insertValues =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -361,9 +361,37 @@ static com.datastrato.gravitino.rel.TableChange transformTableChange(TableChange
} else if (change instanceof TableChange.RemoveProperty) {
TableChange.RemoveProperty removeProperty = (TableChange.RemoveProperty) change;
return com.datastrato.gravitino.rel.TableChange.removeProperty(removeProperty.property());
} else if (change instanceof TableChange.AddColumn) {
TableChange.AddColumn addColumn = (TableChange.AddColumn) change;
return com.datastrato.gravitino.rel.TableChange.addColumn(
addColumn.fieldNames(),
SparkTypeConverter.toGravitinoType(addColumn.dataType()),
addColumn.comment(),
transformColumnPosition(addColumn.position()),
addColumn.isNullable());
} else if (change instanceof TableChange.DeleteColumn) {
TableChange.DeleteColumn deleteColumn = (TableChange.DeleteColumn) change;
return com.datastrato.gravitino.rel.TableChange.deleteColumn(
deleteColumn.fieldNames(), deleteColumn.ifExists());
} else {
throw new UnsupportedOperationException(
String.format("Unsupported table change %s", change.getClass().getName()));
}
}

private static com.datastrato.gravitino.rel.TableChange.ColumnPosition transformColumnPosition(
TableChange.ColumnPosition columnPosition) {
if (null == columnPosition) {
return com.datastrato.gravitino.rel.TableChange.ColumnPosition.defaultPos();
} else if (columnPosition instanceof TableChange.First) {
return com.datastrato.gravitino.rel.TableChange.ColumnPosition.first();
} else if (columnPosition instanceof TableChange.After) {
TableChange.After after = (TableChange.After) columnPosition;
return com.datastrato.gravitino.rel.TableChange.ColumnPosition.after(after.column());
} else {
throw new UnsupportedOperationException(
String.format(
"Unsupported table column position %s", columnPosition.getClass().getName()));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,10 @@

package com.datastrato.gravitino.spark.connector.catalog;

import org.apache.spark.sql.connector.catalog.ColumnDefaultValue;
import org.apache.spark.sql.connector.catalog.TableChange;
import org.apache.spark.sql.connector.expressions.LiteralValue;
import org.apache.spark.sql.types.DataTypes;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

Expand Down Expand Up @@ -35,4 +38,88 @@ void testTransformRemoveProperty() {
(com.datastrato.gravitino.rel.TableChange.RemoveProperty) tableChange;
Assertions.assertEquals("key", gravitinoRemoveProperty.getProperty());
}

@Test
void testTransformAddColumn() {

TableChange.ColumnPosition first = TableChange.ColumnPosition.first();
TableChange.ColumnPosition after = TableChange.ColumnPosition.after("col0");
ColumnDefaultValue defaultValue =
new ColumnDefaultValue(
"CURRENT_DEFAULT", new LiteralValue("default_value", DataTypes.StringType));

TableChange.AddColumn sparkAddColumnFirst =
(TableChange.AddColumn)
TableChange.addColumn(
new String[] {"col1"}, DataTypes.StringType, true, "", first, defaultValue);
com.datastrato.gravitino.rel.TableChange gravitinoChangeFirst =
GravitinoCatalog.transformTableChange(sparkAddColumnFirst);

Assertions.assertTrue(
gravitinoChangeFirst instanceof com.datastrato.gravitino.rel.TableChange.AddColumn);
com.datastrato.gravitino.rel.TableChange.AddColumn gravitinoAddColumnFirst =
(com.datastrato.gravitino.rel.TableChange.AddColumn) gravitinoChangeFirst;

Assertions.assertEquals(sparkAddColumnFirst.fieldNames(), gravitinoAddColumnFirst.fieldName());
Assertions.assertTrue(
"string".equalsIgnoreCase(gravitinoAddColumnFirst.getDataType().simpleString()));
Assertions.assertTrue(
gravitinoAddColumnFirst.getPosition()
instanceof com.datastrato.gravitino.rel.TableChange.First);

TableChange.AddColumn sparkAddColumnAfter =
(TableChange.AddColumn)
TableChange.addColumn(
new String[] {"col1"}, DataTypes.StringType, true, "", after, defaultValue);
com.datastrato.gravitino.rel.TableChange gravitinoChangeAfter =
GravitinoCatalog.transformTableChange(sparkAddColumnAfter);

Assertions.assertTrue(
gravitinoChangeAfter instanceof com.datastrato.gravitino.rel.TableChange.AddColumn);
com.datastrato.gravitino.rel.TableChange.AddColumn gravitinoAddColumnAfter =
(com.datastrato.gravitino.rel.TableChange.AddColumn) gravitinoChangeAfter;

Assertions.assertEquals(sparkAddColumnAfter.fieldNames(), gravitinoAddColumnAfter.fieldName());
Assertions.assertTrue(
"string".equalsIgnoreCase(gravitinoAddColumnAfter.getDataType().simpleString()));
Assertions.assertTrue(
gravitinoAddColumnAfter.getPosition()
instanceof com.datastrato.gravitino.rel.TableChange.After);

TableChange.AddColumn sparkAddColumnDefault =
(TableChange.AddColumn)
TableChange.addColumn(
new String[] {"col1"}, DataTypes.StringType, true, "", null, defaultValue);
com.datastrato.gravitino.rel.TableChange gravitinoChangeDefault =
GravitinoCatalog.transformTableChange(sparkAddColumnDefault);

Assertions.assertTrue(
gravitinoChangeDefault instanceof com.datastrato.gravitino.rel.TableChange.AddColumn);
com.datastrato.gravitino.rel.TableChange.AddColumn gravitinoAddColumnDefault =
(com.datastrato.gravitino.rel.TableChange.AddColumn) gravitinoChangeDefault;

Assertions.assertEquals(
sparkAddColumnDefault.fieldNames(), gravitinoAddColumnDefault.fieldName());
Assertions.assertTrue(
"string".equalsIgnoreCase(gravitinoAddColumnDefault.getDataType().simpleString()));
Assertions.assertTrue(
gravitinoAddColumnDefault.getPosition()
instanceof com.datastrato.gravitino.rel.TableChange.Default);
}

@Test
void testTransformDeleteColumn() {
TableChange.DeleteColumn sparkDeleteColumn =
(TableChange.DeleteColumn) TableChange.deleteColumn(new String[] {"col1"}, true);
com.datastrato.gravitino.rel.TableChange gravitinoChange =
GravitinoCatalog.transformTableChange(sparkDeleteColumn);

Assertions.assertTrue(
gravitinoChange instanceof com.datastrato.gravitino.rel.TableChange.DeleteColumn);
com.datastrato.gravitino.rel.TableChange.DeleteColumn gravitinoDeleteColumn =
(com.datastrato.gravitino.rel.TableChange.DeleteColumn) gravitinoChange;

Assertions.assertEquals(sparkDeleteColumn.fieldNames(), gravitinoDeleteColumn.fieldName());
Assertions.assertEquals(sparkDeleteColumn.ifExists(), gravitinoDeleteColumn.getIfExists());
}
}

0 comments on commit 40c327b

Please sign in to comment.