From 3448dca2c8394b83214be79cc8d9f8f925095af6 Mon Sep 17 00:00:00 2001 From: Peidian li <38486782+coolderli@users.noreply.github.com> Date: Wed, 24 Jul 2024 21:05:46 +0800 Subject: [PATCH] [#3981]feat(flink-connector):Support alter table operation for hive (#4097) ### What changes were proposed in this pull request? - Support alter table operation for hive table. ### Why are the changes needed? - Fix: #3981 ### Does this PR introduce _any_ user-facing change? - no ### How was this patch tested? - add UTs --- .../flink/connector/catalog/BaseCatalog.java | 167 ++++++++++- .../flink/connector/utils/TableUtils.java | 44 +++ .../flink/connector/utils/TypeUtils.java | 4 + .../connector/catalog/TestBaseCatalog.java | 67 +++++ .../integration/test/FlinkCommonIT.java | 281 +++++++++++++++++- .../flink/connector/utils/TestTypeUtils.java | 3 + 6 files changed, 563 insertions(+), 3 deletions(-) create mode 100644 flink-connector/src/main/java/org/apache/gravitino/flink/connector/utils/TableUtils.java diff --git a/flink-connector/src/main/java/org/apache/gravitino/flink/connector/catalog/BaseCatalog.java b/flink-connector/src/main/java/org/apache/gravitino/flink/connector/catalog/BaseCatalog.java index 7fdc8f10836..6b76e31b8d2 100644 --- a/flink-connector/src/main/java/org/apache/gravitino/flink/connector/catalog/BaseCatalog.java +++ b/flink-connector/src/main/java/org/apache/gravitino/flink/connector/catalog/BaseCatalog.java @@ -20,11 +20,13 @@ package org.apache.gravitino.flink.connector.catalog; import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; import com.google.common.collect.MapDifference; import com.google.common.collect.Maps; import java.util.Arrays; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.stream.Collectors; import java.util.stream.Stream; import org.apache.commons.compress.utils.Lists; @@ -68,6 +70,7 @@ import org.apache.gravitino.exceptions.TableAlreadyExistsException; import org.apache.gravitino.flink.connector.PartitionConverter; import org.apache.gravitino.flink.connector.PropertiesConverter; +import org.apache.gravitino.flink.connector.utils.TableUtils; import org.apache.gravitino.flink.connector.utils.TypeUtils; import org.apache.gravitino.rel.Column; import org.apache.gravitino.rel.Table; @@ -254,6 +257,8 @@ public void renameTable(ObjectPath tablePath, String newTableName, boolean ignor @Override public void createTable(ObjectPath tablePath, CatalogBaseTable table, boolean ignoreIfExists) throws TableAlreadyExistException, DatabaseNotExistException, CatalogException { + Preconditions.checkArgument( + table instanceof ResolvedCatalogBaseTable, "table should be resolved"); NameIdentifier identifier = NameIdentifier.of(tablePath.getDatabaseName(), tablePath.getObjectName()); @@ -280,10 +285,72 @@ public void createTable(ObjectPath tablePath, CatalogBaseTable table, boolean ig } } + /** + * The method only is used to change the comments. To alter columns, use the other alterTable API + * and provide a list of TableChanges. + * + * @param tablePath path of the table or view to be modified + * @param newTable the new table definition + * @param ignoreIfNotExists flag to specify behavior when the table or view does not exist: if set + * to false, throw an exception, if set to true, do nothing. + * @throws TableNotExistException if the table not exists. + * @throws CatalogException in case of any runtime exception. + */ @Override - public void alterTable(ObjectPath objectPath, CatalogBaseTable catalogBaseTable, boolean b) + public void alterTable(ObjectPath tablePath, CatalogBaseTable newTable, boolean ignoreIfNotExists) throws TableNotExistException, CatalogException { - throw new UnsupportedOperationException(); + CatalogBaseTable existingTable; + + try { + existingTable = this.getTable(tablePath); + } catch (TableNotExistException e) { + if (!ignoreIfNotExists) { + throw e; + } + return; + } + + if (existingTable.getTableKind() != newTable.getTableKind()) { + throw new CatalogException( + String.format( + "Table types don't match. Existing table is '%s' and new table is '%s'.", + existingTable.getTableKind(), newTable.getTableKind())); + } + + NameIdentifier identifier = + NameIdentifier.of(tablePath.getDatabaseName(), tablePath.getObjectName()); + catalog() + .asTableCatalog() + .alterTable(identifier, getGravitinoTableChanges(existingTable, newTable)); + } + + @Override + public void alterTable( + ObjectPath tablePath, + CatalogBaseTable newTable, + List tableChanges, + boolean ignoreIfNotExists) + throws TableNotExistException, CatalogException { + CatalogBaseTable existingTable; + try { + existingTable = this.getTable(tablePath); + } catch (TableNotExistException e) { + if (!ignoreIfNotExists) { + throw e; + } + return; + } + + if (existingTable.getTableKind() != newTable.getTableKind()) { + throw new CatalogException( + String.format( + "Table types don't match. Existing table is '%s' and new table is '%s'.", + existingTable.getTableKind(), newTable.getTableKind())); + } + + NameIdentifier identifier = + NameIdentifier.of(tablePath.getDatabaseName(), tablePath.getObjectName()); + catalog().asTableCatalog().alterTable(identifier, getGravitinoTableChanges(tableChanges)); } @Override @@ -470,6 +537,102 @@ private Column toGravitinoColumn(org.apache.flink.table.catalog.Column column) { null); } + private static void removeProperty( + org.apache.flink.table.catalog.TableChange.ResetOption change, List changes) { + changes.add(TableChange.removeProperty(change.getKey())); + } + + private static void setProperty( + org.apache.flink.table.catalog.TableChange.SetOption change, List changes) { + changes.add(TableChange.setProperty(change.getKey(), change.getValue())); + } + + private static void dropColumn( + org.apache.flink.table.catalog.TableChange.DropColumn change, List changes) { + changes.add(TableChange.deleteColumn(new String[] {change.getColumnName()}, true)); + } + + private static void addColumn( + org.apache.flink.table.catalog.TableChange.AddColumn change, List changes) { + changes.add( + TableChange.addColumn( + new String[] {change.getColumn().getName()}, + TypeUtils.toGravitinoType(change.getColumn().getDataType().getLogicalType()), + change.getColumn().getComment().orElse(null), + TableUtils.toGravitinoColumnPosition(change.getPosition()))); + } + + private static void modifyColumn( + org.apache.flink.table.catalog.TableChange change, List changes) { + if (change instanceof org.apache.flink.table.catalog.TableChange.ModifyColumnName) { + org.apache.flink.table.catalog.TableChange.ModifyColumnName modifyColumnName = + (org.apache.flink.table.catalog.TableChange.ModifyColumnName) change; + changes.add( + TableChange.renameColumn( + new String[] {modifyColumnName.getOldColumnName()}, + modifyColumnName.getNewColumnName())); + } else if (change + instanceof org.apache.flink.table.catalog.TableChange.ModifyPhysicalColumnType) { + org.apache.flink.table.catalog.TableChange.ModifyPhysicalColumnType modifyColumnType = + (org.apache.flink.table.catalog.TableChange.ModifyPhysicalColumnType) change; + changes.add( + TableChange.updateColumnType( + new String[] {modifyColumnType.getOldColumn().getName()}, + TypeUtils.toGravitinoType(modifyColumnType.getNewType().getLogicalType()))); + } else if (change instanceof org.apache.flink.table.catalog.TableChange.ModifyColumnPosition) { + org.apache.flink.table.catalog.TableChange.ModifyColumnPosition modifyColumnPosition = + (org.apache.flink.table.catalog.TableChange.ModifyColumnPosition) change; + changes.add( + TableChange.updateColumnPosition( + new String[] {modifyColumnPosition.getOldColumn().getName()}, + TableUtils.toGravitinoColumnPosition(modifyColumnPosition.getNewPosition()))); + } else if (change instanceof org.apache.flink.table.catalog.TableChange.ModifyColumnComment) { + org.apache.flink.table.catalog.TableChange.ModifyColumnComment modifyColumnComment = + (org.apache.flink.table.catalog.TableChange.ModifyColumnComment) change; + changes.add( + TableChange.updateColumnComment( + new String[] {modifyColumnComment.getOldColumn().getName()}, + modifyColumnComment.getNewComment())); + } else { + throw new IllegalArgumentException( + String.format("Not support ModifyColumn : %s", change.getClass())); + } + } + + @VisibleForTesting + static TableChange[] getGravitinoTableChanges( + CatalogBaseTable existingTable, CatalogBaseTable newTable) { + Preconditions.checkNotNull(newTable.getComment(), "The new comment should not be null"); + List changes = Lists.newArrayList(); + if (!Objects.equals(newTable.getComment(), existingTable.getComment())) { + changes.add(TableChange.updateComment(newTable.getComment())); + } + return changes.toArray(new TableChange[0]); + } + + @VisibleForTesting + static TableChange[] getGravitinoTableChanges( + List tableChanges) { + List changes = Lists.newArrayList(); + for (org.apache.flink.table.catalog.TableChange change : tableChanges) { + if (change instanceof org.apache.flink.table.catalog.TableChange.AddColumn) { + addColumn((org.apache.flink.table.catalog.TableChange.AddColumn) change, changes); + } else if (change instanceof org.apache.flink.table.catalog.TableChange.DropColumn) { + dropColumn((org.apache.flink.table.catalog.TableChange.DropColumn) change, changes); + } else if (change instanceof org.apache.flink.table.catalog.TableChange.ModifyColumn) { + modifyColumn(change, changes); + } else if (change instanceof org.apache.flink.table.catalog.TableChange.SetOption) { + setProperty((org.apache.flink.table.catalog.TableChange.SetOption) change, changes); + } else if (change instanceof org.apache.flink.table.catalog.TableChange.ResetOption) { + removeProperty((org.apache.flink.table.catalog.TableChange.ResetOption) change, changes); + } else { + throw new UnsupportedOperationException( + String.format("Not supported change : %s", change.getClass())); + } + } + return changes.toArray(new TableChange[0]); + } + @VisibleForTesting static SchemaChange[] getSchemaChange(CatalogDatabase current, CatalogDatabase updated) { Map currentProperties = current.getProperties(); diff --git a/flink-connector/src/main/java/org/apache/gravitino/flink/connector/utils/TableUtils.java b/flink-connector/src/main/java/org/apache/gravitino/flink/connector/utils/TableUtils.java new file mode 100644 index 00000000000..68bae41807c --- /dev/null +++ b/flink-connector/src/main/java/org/apache/gravitino/flink/connector/utils/TableUtils.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.gravitino.flink.connector.utils; + +import org.apache.gravitino.rel.TableChange; + +public class TableUtils { + private TableUtils() {} + + public static TableChange.ColumnPosition toGravitinoColumnPosition( + org.apache.flink.table.catalog.TableChange.ColumnPosition columnPosition) { + if (columnPosition == null) { + return null; + } + + if (columnPosition instanceof org.apache.flink.table.catalog.TableChange.First) { + return TableChange.ColumnPosition.first(); + } else if (columnPosition instanceof org.apache.flink.table.catalog.TableChange.After) { + org.apache.flink.table.catalog.TableChange.After after = + (org.apache.flink.table.catalog.TableChange.After) columnPosition; + return TableChange.ColumnPosition.after(after.column()); + } else { + throw new IllegalArgumentException( + String.format("Not support column position : %s", columnPosition.getClass())); + } + } +} diff --git a/flink-connector/src/main/java/org/apache/gravitino/flink/connector/utils/TypeUtils.java b/flink-connector/src/main/java/org/apache/gravitino/flink/connector/utils/TypeUtils.java index a11e20cb31f..7c56350375b 100644 --- a/flink-connector/src/main/java/org/apache/gravitino/flink/connector/utils/TypeUtils.java +++ b/flink-connector/src/main/java/org/apache/gravitino/flink/connector/utils/TypeUtils.java @@ -37,6 +37,8 @@ public static Type toGravitinoType(LogicalType logicalType) { return Types.DoubleType.get(); case INTEGER: return Types.IntegerType.get(); + case BIGINT: + return Types.LongType.get(); default: throw new UnsupportedOperationException( "Not support type: " + logicalType.asSummaryString()); @@ -51,6 +53,8 @@ public static DataType toFlinkType(Type gravitinoType) { return DataTypes.STRING(); case INTEGER: return DataTypes.INT(); + case LONG: + return DataTypes.BIGINT(); default: throw new UnsupportedOperationException("Not support " + gravitinoType.toString()); } diff --git a/flink-connector/src/test/java/org/apache/gravitino/flink/connector/catalog/TestBaseCatalog.java b/flink-connector/src/test/java/org/apache/gravitino/flink/connector/catalog/TestBaseCatalog.java index a016a9603d0..89d25f8bd01 100644 --- a/flink-connector/src/test/java/org/apache/gravitino/flink/connector/catalog/TestBaseCatalog.java +++ b/flink-connector/src/test/java/org/apache/gravitino/flink/connector/catalog/TestBaseCatalog.java @@ -18,11 +18,20 @@ */ package org.apache.gravitino.flink.connector.catalog; +import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; +import java.util.List; import java.util.Map; +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.api.Schema; +import org.apache.flink.table.catalog.CatalogBaseTable; import org.apache.flink.table.catalog.CatalogDatabase; import org.apache.flink.table.catalog.CatalogDatabaseImpl; +import org.apache.flink.table.catalog.CatalogTable; +import org.apache.flink.table.catalog.Column; +import org.apache.flink.table.catalog.TableChange; import org.apache.gravitino.SchemaChange; +import org.apache.gravitino.rel.types.Types; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; @@ -49,4 +58,62 @@ public void testHiveSchemaChanges() { Assertions.assertEquals("key2", ((SchemaChange.SetProperty) schemaChange[2]).getProperty()); Assertions.assertEquals("new-value2", ((SchemaChange.SetProperty) schemaChange[2]).getValue()); } + + @Test + public void testTableChanges() { + List tableChanges = + ImmutableList.of( + TableChange.add(Column.physical("test", DataTypes.INT())), + TableChange.modifyPhysicalColumnType( + Column.physical("test", DataTypes.INT()), DataTypes.DOUBLE()), + TableChange.modifyColumnName(Column.physical("test", DataTypes.INT()), "test2"), + TableChange.dropColumn("aaa"), + TableChange.modifyColumnComment( + Column.physical("test", DataTypes.INT()), "new comment"), + TableChange.modifyColumnPosition( + Column.physical("test", DataTypes.INT()), + TableChange.ColumnPosition.after("test2")), + TableChange.modifyColumnPosition( + Column.physical("test", DataTypes.INT()), TableChange.ColumnPosition.first()), + TableChange.set("key", "value"), + TableChange.reset("key")); + + List expected = + ImmutableList.of( + org.apache.gravitino.rel.TableChange.addColumn( + new String[] {"test"}, Types.IntegerType.get()), + org.apache.gravitino.rel.TableChange.updateColumnType( + new String[] {"test"}, Types.DoubleType.get()), + org.apache.gravitino.rel.TableChange.renameColumn(new String[] {"test"}, "test2"), + org.apache.gravitino.rel.TableChange.deleteColumn(new String[] {"aaa"}, true), + org.apache.gravitino.rel.TableChange.updateColumnComment( + new String[] {"test"}, "new comment"), + org.apache.gravitino.rel.TableChange.updateColumnPosition( + new String[] {"test"}, + org.apache.gravitino.rel.TableChange.ColumnPosition.after("test2")), + org.apache.gravitino.rel.TableChange.updateColumnPosition( + new String[] {"test"}, org.apache.gravitino.rel.TableChange.ColumnPosition.first()), + org.apache.gravitino.rel.TableChange.setProperty("key", "value"), + org.apache.gravitino.rel.TableChange.removeProperty("key")); + + org.apache.gravitino.rel.TableChange[] gravitinoTableChanges = + BaseCatalog.getGravitinoTableChanges(tableChanges); + Assertions.assertArrayEquals(expected.toArray(), gravitinoTableChanges); + } + + @Test + public void testTableChangesWithoutColumnChange() { + Schema schema = Schema.newBuilder().column("test", "INT").build(); + CatalogBaseTable table = + CatalogTable.of( + schema, "test", ImmutableList.of(), ImmutableMap.of("key", "value", "key2", "value2")); + CatalogBaseTable newTable = + CatalogTable.of( + schema, "new comment", ImmutableList.of(), ImmutableMap.of("key", "new value")); + org.apache.gravitino.rel.TableChange[] tableChanges = + BaseCatalog.getGravitinoTableChanges(table, newTable); + List expected = + ImmutableList.of(org.apache.gravitino.rel.TableChange.updateComment("new comment")); + Assertions.assertArrayEquals(expected.toArray(), tableChanges); + } } diff --git a/flink-connector/src/test/java/org/apache/gravitino/flink/connector/integration/test/FlinkCommonIT.java b/flink-connector/src/test/java/org/apache/gravitino/flink/connector/integration/test/FlinkCommonIT.java index 06905fff980..46b1d1d23d8 100644 --- a/flink-connector/src/test/java/org/apache/gravitino/flink/connector/integration/test/FlinkCommonIT.java +++ b/flink-connector/src/test/java/org/apache/gravitino/flink/connector/integration/test/FlinkCommonIT.java @@ -22,15 +22,25 @@ import static org.apache.gravitino.flink.connector.integration.test.utils.TestUtils.assertColumns; import static org.apache.gravitino.flink.connector.integration.test.utils.TestUtils.toFlinkPhysicalColumn; import static org.apache.gravitino.rel.expressions.transforms.Transforms.EMPTY_TRANSFORM; +import static org.junit.jupiter.api.Assertions.fail; +import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; import java.util.Optional; +import org.apache.commons.compress.utils.Lists; import org.apache.flink.table.api.DataTypes; import org.apache.flink.table.api.ResultKind; import org.apache.flink.table.api.TableResult; import org.apache.flink.table.catalog.CatalogBaseTable; import org.apache.flink.table.catalog.CatalogTable; import org.apache.flink.table.catalog.ObjectPath; +import org.apache.flink.table.catalog.ResolvedCatalogTable; +import org.apache.flink.table.catalog.ResolvedSchema; +import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException; +import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException; import org.apache.flink.table.catalog.exceptions.TableNotExistException; import org.apache.flink.types.Row; import org.apache.gravitino.Catalog; @@ -325,9 +335,278 @@ public void testGetSimpleTable() { CatalogTable catalogTable = (CatalogTable) table; Assertions.assertFalse(catalogTable.isPartitioned()); } catch (TableNotExistException e) { - Assertions.fail(e); + fail(e); } }, true); } + + @Test + public void testRenameColumn() { + String databaseName = "test_renam_column_db"; + String tableName = "test_rename_column"; + doWithSchema( + currentCatalog(), + databaseName, + catalog -> { + TableResult result = + sql( + "CREATE TABLE %s " + + "(user_id INT COMMENT 'USER_ID', " + + " order_amount DOUBLE COMMENT 'ORDER_AMOUNT')" + + " COMMENT 'test comment'", + tableName); + TestUtils.assertTableResult(result, ResultKind.SUCCESS); + + result = sql("ALTER TABLE %s RENAME user_id TO user_id_new", tableName); + TestUtils.assertTableResult(result, ResultKind.SUCCESS); + + Column[] actual = + catalog + .asTableCatalog() + .loadTable(NameIdentifier.of(databaseName, tableName)) + .columns(); + Column[] expected = + new Column[] { + Column.of("user_id_new", Types.IntegerType.get(), "USER_ID"), + Column.of("order_amount", Types.DoubleType.get(), "ORDER_AMOUNT") + }; + assertColumns(expected, actual); + }, + true); + } + + @Test + public void testAlterTableComment() { + String databaseName = "test_alter_table_comment_database"; + String tableName = "test_alter_table_comment"; + String newComment = "new_table_comment"; + doWithSchema( + currentCatalog(), + databaseName, + catalog -> { + Optional flinkCatalog = + tableEnv.getCatalog(currentCatalog().name()); + if (flinkCatalog.isPresent()) { + org.apache.flink.table.catalog.Catalog currentFlinkCatalog = flinkCatalog.get(); + ObjectPath currentTablePath = new ObjectPath(databaseName, tableName); + try { + // use java api to create a new table + org.apache.flink.table.api.Schema schema = + org.apache.flink.table.api.Schema.newBuilder() + .column("test", DataTypes.INT()) + .build(); + CatalogTable newTable = + CatalogTable.of(schema, "test comment", ImmutableList.of(), ImmutableMap.of()); + List columns = Lists.newArrayList(); + columns.add(org.apache.flink.table.catalog.Column.physical("test", DataTypes.INT())); + ResolvedSchema resolvedSchema = new ResolvedSchema(columns, new ArrayList<>(), null); + currentFlinkCatalog.createTable( + currentTablePath, new ResolvedCatalogTable(newTable, resolvedSchema), false); + CatalogTable table = (CatalogTable) currentFlinkCatalog.getTable(currentTablePath); + + // alter table comment + currentFlinkCatalog.alterTable( + currentTablePath, + CatalogTable.of( + table.getUnresolvedSchema(), + newComment, + table.getPartitionKeys(), + table.getOptions()), + false); + + CatalogTable loadedTable = + (CatalogTable) currentFlinkCatalog.getTable(currentTablePath); + Assertions.assertEquals(newComment, loadedTable.getComment()); + Table gravitinoTable = + currentCatalog() + .asTableCatalog() + .loadTable(NameIdentifier.of(databaseName, tableName)); + Assertions.assertEquals(newComment, gravitinoTable.comment()); + } catch (DatabaseNotExistException + | TableAlreadyExistException + | TableNotExistException e) { + fail(e); + } + } else { + fail("Catalog doesn't exist"); + } + }, + true); + } + + @Test + public void testAlterTableAddColumn() { + String databaseName = "test_alter_table_add_column_db"; + String tableName = "test_alter_table_add_column"; + doWithSchema( + currentCatalog(), + databaseName, + catalog -> { + TableResult result = + sql( + "CREATE TABLE %s " + + "(user_id INT COMMENT 'USER_ID', " + + " order_amount INT COMMENT 'ORDER_AMOUNT')" + + " COMMENT 'test comment'", + tableName); + TestUtils.assertTableResult(result, ResultKind.SUCCESS); + result = sql("ALTER TABLE %s ADD new_column_2 INT AFTER order_amount", tableName); + TestUtils.assertTableResult(result, ResultKind.SUCCESS); + + Column[] actual = + catalog + .asTableCatalog() + .loadTable(NameIdentifier.of(databaseName, tableName)) + .columns(); + Column[] expected = + new Column[] { + Column.of("user_id", Types.IntegerType.get(), "USER_ID"), + Column.of("order_amount", Types.IntegerType.get(), "ORDER_AMOUNT"), + Column.of("new_column_2", Types.IntegerType.get(), null), + }; + assertColumns(expected, actual); + }, + true); + } + + @Test + public void testAlterTableDropColumn() { + String databaseName = "test_alter_table_drop_column_db"; + String tableName = "test_alter_table_drop_column"; + doWithSchema( + currentCatalog(), + databaseName, + catalog -> { + TableResult result = + sql( + "CREATE TABLE %s " + + "(user_id INT COMMENT 'USER_ID', " + + " order_amount INT COMMENT 'ORDER_AMOUNT')" + + " COMMENT 'test comment'", + tableName); + TestUtils.assertTableResult(result, ResultKind.SUCCESS); + result = sql("ALTER TABLE %s DROP user_id", tableName); + TestUtils.assertTableResult(result, ResultKind.SUCCESS); + Column[] actual = + catalog + .asTableCatalog() + .loadTable(NameIdentifier.of(databaseName, tableName)) + .columns(); + Column[] expected = + new Column[] {Column.of("order_amount", Types.IntegerType.get(), "ORDER_AMOUNT")}; + assertColumns(expected, actual); + }, + true); + } + + @Test + public void testAlterColumnTypeAndChangeOrder() { + String databaseName = "test_alter_table_alter_column_db"; + String tableName = "test_alter_table_rename_column"; + doWithSchema( + currentCatalog(), + databaseName, + catalog -> { + TableResult result = + sql( + "CREATE TABLE %s " + + "(user_id BIGINT COMMENT 'USER_ID', " + + " order_amount INT COMMENT 'ORDER_AMOUNT')" + + " COMMENT 'test comment'" + + " WITH (" + + "'%s' = '%s')", + tableName, "test key", "test value"); + TestUtils.assertTableResult(result, ResultKind.SUCCESS); + result = + sql("ALTER TABLE %s MODIFY order_amount BIGINT COMMENT 'new comment2'", tableName); + TestUtils.assertTableResult(result, ResultKind.SUCCESS); + result = + sql( + "ALTER TABLE %s MODIFY user_id BIGINT COMMENT 'new comment' AFTER order_amount", + tableName); + TestUtils.assertTableResult(result, ResultKind.SUCCESS); + Column[] actual = + catalog + .asTableCatalog() + .loadTable(NameIdentifier.of(databaseName, tableName)) + .columns(); + Column[] expected = + new Column[] { + Column.of("order_amount", Types.LongType.get(), "new comment2"), + Column.of("user_id", Types.LongType.get(), "new comment") + }; + assertColumns(expected, actual); + }, + true); + } + + @Test + public void testRenameTable() { + String databaseName = "test_rename_table_db"; + String tableName = "test_rename_table"; + doWithSchema( + currentCatalog(), + databaseName, + catalog -> { + TableResult result = + sql( + "CREATE TABLE %s " + + "(user_id INT COMMENT 'USER_ID', " + + " order_amount INT COMMENT 'ORDER_AMOUNT')" + + " COMMENT 'test comment'", + tableName); + TestUtils.assertTableResult(result, ResultKind.SUCCESS); + String newTableName = "new_rename_table_name"; + result = sql("ALTER TABLE %s RENAME TO %s", tableName, newTableName); + TestUtils.assertTableResult(result, ResultKind.SUCCESS); + Assertions.assertFalse( + catalog.asTableCatalog().tableExists(NameIdentifier.of(databaseName, tableName))); + Assertions.assertTrue( + catalog.asTableCatalog().tableExists(NameIdentifier.of(databaseName, newTableName))); + }, + true); + } + + @Test + public void testAlterTableProperties() { + String databaseName = "test_alter_table_properties_db"; + String tableName = "test_alter_table_properties"; + doWithSchema( + currentCatalog(), + databaseName, + catalog -> { + TableResult result = + sql( + "CREATE TABLE %s " + + "(user_id INT COMMENT 'USER_ID', " + + " order_amount INT COMMENT 'ORDER_AMOUNT')" + + " COMMENT 'test comment'" + + " WITH (" + + "'%s' = '%s')", + tableName, "key", "value"); + TestUtils.assertTableResult(result, ResultKind.SUCCESS); + result = sql("ALTER TABLE %s SET ('key2' = 'value2', 'key' = 'value1')", tableName); + TestUtils.assertTableResult(result, ResultKind.SUCCESS); + Map properties = + catalog + .asTableCatalog() + .loadTable(NameIdentifier.of(databaseName, tableName)) + .properties(); + + Assertions.assertEquals("value1", properties.get("key")); + Assertions.assertEquals("value2", properties.get("key2")); + result = sql("ALTER TABLE %s RESET ('key2')", tableName); + TestUtils.assertTableResult(result, ResultKind.SUCCESS); + + properties = + catalog + .asTableCatalog() + .loadTable(NameIdentifier.of(databaseName, tableName)) + .properties(); + Assertions.assertEquals("value1", properties.get("key")); + Assertions.assertNull(properties.get("key2")); + }, + true); + } } diff --git a/flink-connector/src/test/java/org/apache/gravitino/flink/connector/utils/TestTypeUtils.java b/flink-connector/src/test/java/org/apache/gravitino/flink/connector/utils/TestTypeUtils.java index e6d78f114a2..e9b1f834339 100644 --- a/flink-connector/src/test/java/org/apache/gravitino/flink/connector/utils/TestTypeUtils.java +++ b/flink-connector/src/test/java/org/apache/gravitino/flink/connector/utils/TestTypeUtils.java @@ -21,6 +21,7 @@ import org.apache.flink.table.api.DataTypes; import org.apache.flink.table.catalog.UnresolvedIdentifier; +import org.apache.flink.table.types.logical.BigIntType; import org.apache.flink.table.types.logical.DoubleType; import org.apache.flink.table.types.logical.IntType; import org.apache.flink.table.types.logical.UnresolvedUserDefinedType; @@ -37,6 +38,7 @@ public void testToGravitinoType() { Types.StringType.get(), TypeUtils.toGravitinoType(new VarCharType(Integer.MAX_VALUE))); Assertions.assertEquals(Types.DoubleType.get(), TypeUtils.toGravitinoType(new DoubleType())); Assertions.assertEquals(Types.IntegerType.get(), TypeUtils.toGravitinoType(new IntType())); + Assertions.assertEquals(Types.LongType.get(), TypeUtils.toGravitinoType(new BigIntType())); Assertions.assertThrows( UnsupportedOperationException.class, () -> @@ -49,6 +51,7 @@ public void testToFlinkType() { Assertions.assertEquals(DataTypes.DOUBLE(), TypeUtils.toFlinkType(Types.DoubleType.get())); Assertions.assertEquals(DataTypes.STRING(), TypeUtils.toFlinkType(Types.StringType.get())); Assertions.assertEquals(DataTypes.INT(), TypeUtils.toFlinkType(Types.IntegerType.get())); + Assertions.assertEquals(DataTypes.BIGINT(), TypeUtils.toFlinkType(Types.LongType.get())); Assertions.assertThrows( UnsupportedOperationException.class, () -> TypeUtils.toFlinkType(Types.UnparsedType.of("unknown")));