Skip to content

Commit

Permalink
[apache#2447] feat(spark-connector): support alter table AddColumn an…
Browse files Browse the repository at this point in the history
…d DropColumn for spark-connector
  • Loading branch information
caican00 committed Mar 7, 2024
1 parent 95569ca commit 4ccf7be
Show file tree
Hide file tree
Showing 3 changed files with 127 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,30 @@ void testListTable() {
Assertions.assertThrows(NoSuchNamespaceException.class, () -> listTableNames("not_exists_db"));
}

@Test
void testAlterTableAddAndDeleteColumn() {
String tableName = "test_column";
dropTableIfExists(tableName);

createSimpleTable(tableName);
List<SparkColumnInfo> sparkOldColumnInfos = getTableInfo(tableName).getColumns();
sparkOldColumnInfos.forEach(
sparkColumnInfo ->
Assertions.assertFalse("col1".equalsIgnoreCase(sparkColumnInfo.getName())));

sql(String.format("ALTER TABLE %S ADD COLUMNS (col1 string)", tableName));
List<SparkColumnInfo> sparkAddColumnInfos = getTableInfo(tableName).getColumns();
Assertions.assertTrue(
sparkAddColumnInfos.stream()
.anyMatch(sparkColumnInfo -> "col1".equalsIgnoreCase(sparkColumnInfo.getName())));

sql(String.format("ALTER TABLE %S DROP COLUMNS (col1)", tableName));
List<SparkColumnInfo> sparkDeleteColumnInfos = getTableInfo(tableName).getColumns();
sparkDeleteColumnInfos.forEach(
sparkColumnInfo ->
Assertions.assertFalse("col1".equalsIgnoreCase(sparkColumnInfo.getName())));
}

private void checkTableReadWrite(SparkTableInfo table) {
String name = table.getTableIdentifier();
String insertValues =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import com.datastrato.gravitino.spark.connector.GravitinoCatalogAdaptorFactory;
import com.datastrato.gravitino.spark.connector.PropertiesConverter;
import com.datastrato.gravitino.spark.connector.SparkTypeConverter;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import java.util.Arrays;
import java.util.HashMap;
Expand Down Expand Up @@ -164,7 +165,21 @@ public Table createTable(

@Override
public Table alterTable(Identifier ident, TableChange... changes) throws NoSuchTableException {
throw new NotSupportedException("Doesn't support altering table for now");
com.datastrato.gravitino.rel.TableChange[] gravitinoTableChanges =
Arrays.stream(changes)
.map(GravitinoCatalog::transformTableChange)
.toArray(com.datastrato.gravitino.rel.TableChange[]::new);
try {
com.datastrato.gravitino.rel.Table table =
gravitinoCatalogClient
.asTableCatalog()
.alterTable(
NameIdentifier.of(metalakeName, catalogName, getDatabase(ident), ident.name()),
gravitinoTableChanges);
return gravitinoAdaptor.createSparkTable(ident, table, sparkCatalog, propertiesConverter);
} catch (com.datastrato.gravitino.exceptions.NoSuchTableException e) {
throw new NoSuchTableException(ident);
}
}

@Override
Expand Down Expand Up @@ -336,4 +351,36 @@ private String getDatabase(NameIdentifier gravitinoIdentifier) {
"Only support 3 level namespace," + gravitinoIdentifier.namespace());
return gravitinoIdentifier.namespace().level(2);
}

@VisibleForTesting
static com.datastrato.gravitino.rel.TableChange transformTableChange(TableChange change) {
if (change instanceof TableChange.AddColumn) {
TableChange.AddColumn addColumn = (TableChange.AddColumn) change;
return com.datastrato.gravitino.rel.TableChange.addColumn(
addColumn.fieldNames(),
SparkTypeConverter.toGravitinoType(addColumn.dataType()),
addColumn.comment(),
transformColumnPosition(addColumn.position()),
addColumn.isNullable());
} else if (change instanceof TableChange.DeleteColumn) {
TableChange.DeleteColumn deleteColumn = (TableChange.DeleteColumn) change;
return com.datastrato.gravitino.rel.TableChange.deleteColumn(
deleteColumn.fieldNames(), deleteColumn.ifExists());
} else {
throw new UnsupportedOperationException(
String.format("Unsupported table change %s", change.getClass().getName()));
}
}

private static com.datastrato.gravitino.rel.TableChange.ColumnPosition transformColumnPosition(
TableChange.ColumnPosition columnPosition) {
if (columnPosition instanceof TableChange.First) {
return com.datastrato.gravitino.rel.TableChange.ColumnPosition.first();
} else if (columnPosition instanceof TableChange.After) {
TableChange.After after = (TableChange.After) columnPosition;
return com.datastrato.gravitino.rel.TableChange.ColumnPosition.after(after.column());
} else {
return com.datastrato.gravitino.rel.TableChange.ColumnPosition.defaultPos();
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
* Copyright 2024 Datastrato Pvt Ltd.
* This software is licensed under the Apache License version 2.
*/

package com.datastrato.gravitino.spark.connector.catalog;

import java.util.Arrays;
import org.apache.spark.sql.connector.catalog.TableChange;
import org.apache.spark.sql.types.DataTypes;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

public class TestTransformTableChange {

@Test
void testTransformAddColumn() {
TableChange sparkChange = TableChange.addColumn(new String[] {"col1"}, DataTypes.StringType);
com.datastrato.gravitino.rel.TableChange gravitinoChange =
GravitinoCatalog.transformTableChange(sparkChange);

TableChange.AddColumn sparkAddColumn = (TableChange.AddColumn) sparkChange;
Assertions.assertTrue(
gravitinoChange instanceof com.datastrato.gravitino.rel.TableChange.AddColumn);
com.datastrato.gravitino.rel.TableChange.AddColumn gravitinoAddColumn =
(com.datastrato.gravitino.rel.TableChange.AddColumn) gravitinoChange;

Assertions.assertEquals(1, sparkAddColumn.fieldNames().length);
Assertions.assertEquals(1, gravitinoAddColumn.fieldName().length);
Assertions.assertEquals(
Arrays.stream(sparkAddColumn.fieldNames()).findFirst(),
Arrays.stream(gravitinoAddColumn.fieldName()).findFirst());
Assertions.assertEquals(sparkAddColumn.dataType(), gravitinoAddColumn.getDataType());
}

@Test
void testTransformDeleteColumn() {
TableChange sparkChange = TableChange.deleteColumn(new String[] {"col1"}, true);
com.datastrato.gravitino.rel.TableChange gravitinoChange =
GravitinoCatalog.transformTableChange(sparkChange);

TableChange.DeleteColumn sparkDeleteColumn = (TableChange.DeleteColumn) sparkChange;
Assertions.assertTrue(
gravitinoChange instanceof com.datastrato.gravitino.rel.TableChange.DeleteColumn);
com.datastrato.gravitino.rel.TableChange.DeleteColumn gravitinoDeleteColumn =
(com.datastrato.gravitino.rel.TableChange.DeleteColumn) gravitinoChange;

Assertions.assertEquals(1, sparkDeleteColumn.fieldNames().length);
Assertions.assertEquals(1, gravitinoDeleteColumn.fieldName().length);
Assertions.assertEquals(
Arrays.stream(sparkDeleteColumn.fieldNames()).findFirst(),
Arrays.stream(gravitinoDeleteColumn.fieldName()).findFirst());
Assertions.assertEquals(sparkDeleteColumn.ifExists(), gravitinoDeleteColumn.getIfExists());
}
}

0 comments on commit 4ccf7be

Please sign in to comment.