From b288049394b33553660fda4e617475db285f6a9d Mon Sep 17 00:00:00 2001 From: caican00 Date: Thu, 7 Mar 2024 21:24:06 +0800 Subject: [PATCH] [#2447] feat(spark-connector): support alter table AddColumn and DropColumn for spark-connector --- .../integration/test/spark/SparkIT.java | 12 ++++----- .../connector/catalog/GravitinoCatalog.java | 16 ++++++------ .../catalog/TestTransformTableChange.java | 25 +++++++++++-------- 3 files changed, 28 insertions(+), 25 deletions(-) diff --git a/integration-test/src/test/java/com/datastrato/gravitino/integration/test/spark/SparkIT.java b/integration-test/src/test/java/com/datastrato/gravitino/integration/test/spark/SparkIT.java index 2892bf358d0..a8e482351e9 100644 --- a/integration-test/src/test/java/com/datastrato/gravitino/integration/test/spark/SparkIT.java +++ b/integration-test/src/test/java/com/datastrato/gravitino/integration/test/spark/SparkIT.java @@ -286,20 +286,20 @@ void testAlterTableAddAndDeleteColumn() { createSimpleTable(tableName); List sparkOldColumnInfos = getTableInfo(tableName).getColumns(); sparkOldColumnInfos.forEach( - sparkColumnInfo -> - Assertions.assertFalse("col1".equalsIgnoreCase(sparkColumnInfo.getName()))); + sparkColumnInfo -> + Assertions.assertFalse("col1".equalsIgnoreCase(sparkColumnInfo.getName()))); sql(String.format("ALTER TABLE %S ADD COLUMNS (col1 string)", tableName)); List sparkAddColumnInfos = getTableInfo(tableName).getColumns(); Assertions.assertTrue( - sparkAddColumnInfos.stream() - .anyMatch(sparkColumnInfo -> "col1".equalsIgnoreCase(sparkColumnInfo.getName()))); + sparkAddColumnInfos.stream() + .anyMatch(sparkColumnInfo -> "col1".equalsIgnoreCase(sparkColumnInfo.getName()))); sql(String.format("ALTER TABLE %S DROP COLUMNS (col1)", tableName)); List sparkDeleteColumnInfos = getTableInfo(tableName).getColumns(); sparkDeleteColumnInfos.forEach( - sparkColumnInfo -> - Assertions.assertFalse("col1".equalsIgnoreCase(sparkColumnInfo.getName()))); + sparkColumnInfo -> + Assertions.assertFalse("col1".equalsIgnoreCase(sparkColumnInfo.getName()))); } private void checkTableReadWrite(SparkTableInfo table) { diff --git a/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/catalog/GravitinoCatalog.java b/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/catalog/GravitinoCatalog.java index 45b257c6587..63414a0cbce 100644 --- a/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/catalog/GravitinoCatalog.java +++ b/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/catalog/GravitinoCatalog.java @@ -364,23 +364,23 @@ static com.datastrato.gravitino.rel.TableChange transformTableChange(TableChange } else if (change instanceof TableChange.AddColumn) { TableChange.AddColumn addColumn = (TableChange.AddColumn) change; return com.datastrato.gravitino.rel.TableChange.addColumn( - addColumn.fieldNames(), - SparkTypeConverter.toGravitinoType(addColumn.dataType()), - addColumn.comment(), - transformColumnPosition(addColumn.position()), - addColumn.isNullable()); + addColumn.fieldNames(), + SparkTypeConverter.toGravitinoType(addColumn.dataType()), + addColumn.comment(), + transformColumnPosition(addColumn.position()), + addColumn.isNullable()); } else if (change instanceof TableChange.DeleteColumn) { TableChange.DeleteColumn deleteColumn = (TableChange.DeleteColumn) change; return com.datastrato.gravitino.rel.TableChange.deleteColumn( - deleteColumn.fieldNames(), deleteColumn.ifExists()); + deleteColumn.fieldNames(), deleteColumn.ifExists()); } else { throw new UnsupportedOperationException( - String.format("Unsupported table change %s", change.getClass().getName())); + String.format("Unsupported table change %s", change.getClass().getName())); } } private static com.datastrato.gravitino.rel.TableChange.ColumnPosition transformColumnPosition( - TableChange.ColumnPosition columnPosition) { + TableChange.ColumnPosition columnPosition) { if (columnPosition instanceof TableChange.First) { return com.datastrato.gravitino.rel.TableChange.ColumnPosition.first(); } else if (columnPosition instanceof TableChange.After) { diff --git a/spark-connector/src/test/java/com/datastrato/gravitino/spark/connector/catalog/TestTransformTableChange.java b/spark-connector/src/test/java/com/datastrato/gravitino/spark/connector/catalog/TestTransformTableChange.java index 3eb1c454770..49be8314d26 100644 --- a/spark-connector/src/test/java/com/datastrato/gravitino/spark/connector/catalog/TestTransformTableChange.java +++ b/spark-connector/src/test/java/com/datastrato/gravitino/spark/connector/catalog/TestTransformTableChange.java @@ -6,6 +6,7 @@ package com.datastrato.gravitino.spark.connector.catalog; import java.util.Arrays; +import java.util.Locale; import org.apache.spark.sql.connector.catalog.TableChange; import org.apache.spark.sql.types.DataTypes; import org.junit.jupiter.api.Assertions; @@ -42,39 +43,41 @@ void testTransformRemoveProperty() { void testTransformAddColumn() { TableChange sparkChange = TableChange.addColumn(new String[] {"col1"}, DataTypes.StringType); com.datastrato.gravitino.rel.TableChange gravitinoChange = - GravitinoCatalog.transformTableChange(sparkChange); + GravitinoCatalog.transformTableChange(sparkChange); TableChange.AddColumn sparkAddColumn = (TableChange.AddColumn) sparkChange; Assertions.assertTrue( - gravitinoChange instanceof com.datastrato.gravitino.rel.TableChange.AddColumn); + gravitinoChange instanceof com.datastrato.gravitino.rel.TableChange.AddColumn); com.datastrato.gravitino.rel.TableChange.AddColumn gravitinoAddColumn = - (com.datastrato.gravitino.rel.TableChange.AddColumn) gravitinoChange; + (com.datastrato.gravitino.rel.TableChange.AddColumn) gravitinoChange; Assertions.assertEquals(1, sparkAddColumn.fieldNames().length); Assertions.assertEquals(1, gravitinoAddColumn.fieldName().length); Assertions.assertEquals( - Arrays.stream(sparkAddColumn.fieldNames()).findFirst(), - Arrays.stream(gravitinoAddColumn.fieldName()).findFirst()); - Assertions.assertEquals(sparkAddColumn.dataType(), gravitinoAddColumn.getDataType()); + Arrays.stream(sparkAddColumn.fieldNames()).findFirst(), + Arrays.stream(gravitinoAddColumn.fieldName()).findFirst()); + Assertions.assertEquals( + sparkAddColumn.dataType().typeName().toLowerCase(Locale.ROOT), + gravitinoAddColumn.getDataType().simpleString().toLowerCase(Locale.ROOT)); } @Test void testTransformDeleteColumn() { TableChange sparkChange = TableChange.deleteColumn(new String[] {"col1"}, true); com.datastrato.gravitino.rel.TableChange gravitinoChange = - GravitinoCatalog.transformTableChange(sparkChange); + GravitinoCatalog.transformTableChange(sparkChange); TableChange.DeleteColumn sparkDeleteColumn = (TableChange.DeleteColumn) sparkChange; Assertions.assertTrue( - gravitinoChange instanceof com.datastrato.gravitino.rel.TableChange.DeleteColumn); + gravitinoChange instanceof com.datastrato.gravitino.rel.TableChange.DeleteColumn); com.datastrato.gravitino.rel.TableChange.DeleteColumn gravitinoDeleteColumn = - (com.datastrato.gravitino.rel.TableChange.DeleteColumn) gravitinoChange; + (com.datastrato.gravitino.rel.TableChange.DeleteColumn) gravitinoChange; Assertions.assertEquals(1, sparkDeleteColumn.fieldNames().length); Assertions.assertEquals(1, gravitinoDeleteColumn.fieldName().length); Assertions.assertEquals( - Arrays.stream(sparkDeleteColumn.fieldNames()).findFirst(), - Arrays.stream(gravitinoDeleteColumn.fieldName()).findFirst()); + Arrays.stream(sparkDeleteColumn.fieldNames()).findFirst(), + Arrays.stream(gravitinoDeleteColumn.fieldName()).findFirst()); Assertions.assertEquals(sparkDeleteColumn.ifExists(), gravitinoDeleteColumn.getIfExists()); } }