Skip to content

Commit

Permalink
[#3981]feat(flink-connector):Support alter table operation for hive (#…
Browse files Browse the repository at this point in the history
…4097)

### What changes were proposed in this pull request?

- Support alter table operation for hive table.

### Why are the changes needed?

- Fix: #3981 

### Does this PR introduce _any_ user-facing change?

- no

### How was this patch tested?

- add UTs
  • Loading branch information
coolderli authored Jul 24, 2024
1 parent 79f1f7d commit 3448dca
Show file tree
Hide file tree
Showing 6 changed files with 563 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,13 @@
package org.apache.gravitino.flink.connector.catalog;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.MapDifference;
import com.google.common.collect.Maps;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.commons.compress.utils.Lists;
Expand Down Expand Up @@ -68,6 +70,7 @@
import org.apache.gravitino.exceptions.TableAlreadyExistsException;
import org.apache.gravitino.flink.connector.PartitionConverter;
import org.apache.gravitino.flink.connector.PropertiesConverter;
import org.apache.gravitino.flink.connector.utils.TableUtils;
import org.apache.gravitino.flink.connector.utils.TypeUtils;
import org.apache.gravitino.rel.Column;
import org.apache.gravitino.rel.Table;
Expand Down Expand Up @@ -254,6 +257,8 @@ public void renameTable(ObjectPath tablePath, String newTableName, boolean ignor
@Override
public void createTable(ObjectPath tablePath, CatalogBaseTable table, boolean ignoreIfExists)
throws TableAlreadyExistException, DatabaseNotExistException, CatalogException {
Preconditions.checkArgument(
table instanceof ResolvedCatalogBaseTable, "table should be resolved");
NameIdentifier identifier =
NameIdentifier.of(tablePath.getDatabaseName(), tablePath.getObjectName());

Expand All @@ -280,10 +285,72 @@ public void createTable(ObjectPath tablePath, CatalogBaseTable table, boolean ig
}
}

/**
* The method only is used to change the comments. To alter columns, use the other alterTable API
* and provide a list of TableChanges.
*
* @param tablePath path of the table or view to be modified
* @param newTable the new table definition
* @param ignoreIfNotExists flag to specify behavior when the table or view does not exist: if set
* to false, throw an exception, if set to true, do nothing.
* @throws TableNotExistException if the table not exists.
* @throws CatalogException in case of any runtime exception.
*/
@Override
public void alterTable(ObjectPath objectPath, CatalogBaseTable catalogBaseTable, boolean b)
public void alterTable(ObjectPath tablePath, CatalogBaseTable newTable, boolean ignoreIfNotExists)
throws TableNotExistException, CatalogException {
throw new UnsupportedOperationException();
CatalogBaseTable existingTable;

try {
existingTable = this.getTable(tablePath);
} catch (TableNotExistException e) {
if (!ignoreIfNotExists) {
throw e;
}
return;
}

if (existingTable.getTableKind() != newTable.getTableKind()) {
throw new CatalogException(
String.format(
"Table types don't match. Existing table is '%s' and new table is '%s'.",
existingTable.getTableKind(), newTable.getTableKind()));
}

NameIdentifier identifier =
NameIdentifier.of(tablePath.getDatabaseName(), tablePath.getObjectName());
catalog()
.asTableCatalog()
.alterTable(identifier, getGravitinoTableChanges(existingTable, newTable));
}

@Override
public void alterTable(
ObjectPath tablePath,
CatalogBaseTable newTable,
List<org.apache.flink.table.catalog.TableChange> tableChanges,
boolean ignoreIfNotExists)
throws TableNotExistException, CatalogException {
CatalogBaseTable existingTable;
try {
existingTable = this.getTable(tablePath);
} catch (TableNotExistException e) {
if (!ignoreIfNotExists) {
throw e;
}
return;
}

if (existingTable.getTableKind() != newTable.getTableKind()) {
throw new CatalogException(
String.format(
"Table types don't match. Existing table is '%s' and new table is '%s'.",
existingTable.getTableKind(), newTable.getTableKind()));
}

NameIdentifier identifier =
NameIdentifier.of(tablePath.getDatabaseName(), tablePath.getObjectName());
catalog().asTableCatalog().alterTable(identifier, getGravitinoTableChanges(tableChanges));
}

@Override
Expand Down Expand Up @@ -470,6 +537,102 @@ private Column toGravitinoColumn(org.apache.flink.table.catalog.Column column) {
null);
}

private static void removeProperty(
org.apache.flink.table.catalog.TableChange.ResetOption change, List<TableChange> changes) {
changes.add(TableChange.removeProperty(change.getKey()));
}

private static void setProperty(
org.apache.flink.table.catalog.TableChange.SetOption change, List<TableChange> changes) {
changes.add(TableChange.setProperty(change.getKey(), change.getValue()));
}

private static void dropColumn(
org.apache.flink.table.catalog.TableChange.DropColumn change, List<TableChange> changes) {
changes.add(TableChange.deleteColumn(new String[] {change.getColumnName()}, true));
}

private static void addColumn(
org.apache.flink.table.catalog.TableChange.AddColumn change, List<TableChange> changes) {
changes.add(
TableChange.addColumn(
new String[] {change.getColumn().getName()},
TypeUtils.toGravitinoType(change.getColumn().getDataType().getLogicalType()),
change.getColumn().getComment().orElse(null),
TableUtils.toGravitinoColumnPosition(change.getPosition())));
}

private static void modifyColumn(
org.apache.flink.table.catalog.TableChange change, List<TableChange> changes) {
if (change instanceof org.apache.flink.table.catalog.TableChange.ModifyColumnName) {
org.apache.flink.table.catalog.TableChange.ModifyColumnName modifyColumnName =
(org.apache.flink.table.catalog.TableChange.ModifyColumnName) change;
changes.add(
TableChange.renameColumn(
new String[] {modifyColumnName.getOldColumnName()},
modifyColumnName.getNewColumnName()));
} else if (change
instanceof org.apache.flink.table.catalog.TableChange.ModifyPhysicalColumnType) {
org.apache.flink.table.catalog.TableChange.ModifyPhysicalColumnType modifyColumnType =
(org.apache.flink.table.catalog.TableChange.ModifyPhysicalColumnType) change;
changes.add(
TableChange.updateColumnType(
new String[] {modifyColumnType.getOldColumn().getName()},
TypeUtils.toGravitinoType(modifyColumnType.getNewType().getLogicalType())));
} else if (change instanceof org.apache.flink.table.catalog.TableChange.ModifyColumnPosition) {
org.apache.flink.table.catalog.TableChange.ModifyColumnPosition modifyColumnPosition =
(org.apache.flink.table.catalog.TableChange.ModifyColumnPosition) change;
changes.add(
TableChange.updateColumnPosition(
new String[] {modifyColumnPosition.getOldColumn().getName()},
TableUtils.toGravitinoColumnPosition(modifyColumnPosition.getNewPosition())));
} else if (change instanceof org.apache.flink.table.catalog.TableChange.ModifyColumnComment) {
org.apache.flink.table.catalog.TableChange.ModifyColumnComment modifyColumnComment =
(org.apache.flink.table.catalog.TableChange.ModifyColumnComment) change;
changes.add(
TableChange.updateColumnComment(
new String[] {modifyColumnComment.getOldColumn().getName()},
modifyColumnComment.getNewComment()));
} else {
throw new IllegalArgumentException(
String.format("Not support ModifyColumn : %s", change.getClass()));
}
}

@VisibleForTesting
static TableChange[] getGravitinoTableChanges(
CatalogBaseTable existingTable, CatalogBaseTable newTable) {
Preconditions.checkNotNull(newTable.getComment(), "The new comment should not be null");
List<TableChange> changes = Lists.newArrayList();
if (!Objects.equals(newTable.getComment(), existingTable.getComment())) {
changes.add(TableChange.updateComment(newTable.getComment()));
}
return changes.toArray(new TableChange[0]);
}

@VisibleForTesting
static TableChange[] getGravitinoTableChanges(
List<org.apache.flink.table.catalog.TableChange> tableChanges) {
List<TableChange> changes = Lists.newArrayList();
for (org.apache.flink.table.catalog.TableChange change : tableChanges) {
if (change instanceof org.apache.flink.table.catalog.TableChange.AddColumn) {
addColumn((org.apache.flink.table.catalog.TableChange.AddColumn) change, changes);
} else if (change instanceof org.apache.flink.table.catalog.TableChange.DropColumn) {
dropColumn((org.apache.flink.table.catalog.TableChange.DropColumn) change, changes);
} else if (change instanceof org.apache.flink.table.catalog.TableChange.ModifyColumn) {
modifyColumn(change, changes);
} else if (change instanceof org.apache.flink.table.catalog.TableChange.SetOption) {
setProperty((org.apache.flink.table.catalog.TableChange.SetOption) change, changes);
} else if (change instanceof org.apache.flink.table.catalog.TableChange.ResetOption) {
removeProperty((org.apache.flink.table.catalog.TableChange.ResetOption) change, changes);
} else {
throw new UnsupportedOperationException(
String.format("Not supported change : %s", change.getClass()));
}
}
return changes.toArray(new TableChange[0]);
}

@VisibleForTesting
static SchemaChange[] getSchemaChange(CatalogDatabase current, CatalogDatabase updated) {
Map<String, String> currentProperties = current.getProperties();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.gravitino.flink.connector.utils;

import org.apache.gravitino.rel.TableChange;

public class TableUtils {
private TableUtils() {}

public static TableChange.ColumnPosition toGravitinoColumnPosition(
org.apache.flink.table.catalog.TableChange.ColumnPosition columnPosition) {
if (columnPosition == null) {
return null;
}

if (columnPosition instanceof org.apache.flink.table.catalog.TableChange.First) {
return TableChange.ColumnPosition.first();
} else if (columnPosition instanceof org.apache.flink.table.catalog.TableChange.After) {
org.apache.flink.table.catalog.TableChange.After after =
(org.apache.flink.table.catalog.TableChange.After) columnPosition;
return TableChange.ColumnPosition.after(after.column());
} else {
throw new IllegalArgumentException(
String.format("Not support column position : %s", columnPosition.getClass()));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ public static Type toGravitinoType(LogicalType logicalType) {
return Types.DoubleType.get();
case INTEGER:
return Types.IntegerType.get();
case BIGINT:
return Types.LongType.get();
default:
throw new UnsupportedOperationException(
"Not support type: " + logicalType.asSummaryString());
Expand All @@ -51,6 +53,8 @@ public static DataType toFlinkType(Type gravitinoType) {
return DataTypes.STRING();
case INTEGER:
return DataTypes.INT();
case LONG:
return DataTypes.BIGINT();
default:
throw new UnsupportedOperationException("Not support " + gravitinoType.toString());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,20 @@
*/
package org.apache.gravitino.flink.connector.catalog;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import java.util.List;
import java.util.Map;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.Schema;
import org.apache.flink.table.catalog.CatalogBaseTable;
import org.apache.flink.table.catalog.CatalogDatabase;
import org.apache.flink.table.catalog.CatalogDatabaseImpl;
import org.apache.flink.table.catalog.CatalogTable;
import org.apache.flink.table.catalog.Column;
import org.apache.flink.table.catalog.TableChange;
import org.apache.gravitino.SchemaChange;
import org.apache.gravitino.rel.types.Types;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

Expand All @@ -49,4 +58,62 @@ public void testHiveSchemaChanges() {
Assertions.assertEquals("key2", ((SchemaChange.SetProperty) schemaChange[2]).getProperty());
Assertions.assertEquals("new-value2", ((SchemaChange.SetProperty) schemaChange[2]).getValue());
}

@Test
public void testTableChanges() {
List<TableChange> tableChanges =
ImmutableList.of(
TableChange.add(Column.physical("test", DataTypes.INT())),
TableChange.modifyPhysicalColumnType(
Column.physical("test", DataTypes.INT()), DataTypes.DOUBLE()),
TableChange.modifyColumnName(Column.physical("test", DataTypes.INT()), "test2"),
TableChange.dropColumn("aaa"),
TableChange.modifyColumnComment(
Column.physical("test", DataTypes.INT()), "new comment"),
TableChange.modifyColumnPosition(
Column.physical("test", DataTypes.INT()),
TableChange.ColumnPosition.after("test2")),
TableChange.modifyColumnPosition(
Column.physical("test", DataTypes.INT()), TableChange.ColumnPosition.first()),
TableChange.set("key", "value"),
TableChange.reset("key"));

List<org.apache.gravitino.rel.TableChange> expected =
ImmutableList.of(
org.apache.gravitino.rel.TableChange.addColumn(
new String[] {"test"}, Types.IntegerType.get()),
org.apache.gravitino.rel.TableChange.updateColumnType(
new String[] {"test"}, Types.DoubleType.get()),
org.apache.gravitino.rel.TableChange.renameColumn(new String[] {"test"}, "test2"),
org.apache.gravitino.rel.TableChange.deleteColumn(new String[] {"aaa"}, true),
org.apache.gravitino.rel.TableChange.updateColumnComment(
new String[] {"test"}, "new comment"),
org.apache.gravitino.rel.TableChange.updateColumnPosition(
new String[] {"test"},
org.apache.gravitino.rel.TableChange.ColumnPosition.after("test2")),
org.apache.gravitino.rel.TableChange.updateColumnPosition(
new String[] {"test"}, org.apache.gravitino.rel.TableChange.ColumnPosition.first()),
org.apache.gravitino.rel.TableChange.setProperty("key", "value"),
org.apache.gravitino.rel.TableChange.removeProperty("key"));

org.apache.gravitino.rel.TableChange[] gravitinoTableChanges =
BaseCatalog.getGravitinoTableChanges(tableChanges);
Assertions.assertArrayEquals(expected.toArray(), gravitinoTableChanges);
}

@Test
public void testTableChangesWithoutColumnChange() {
Schema schema = Schema.newBuilder().column("test", "INT").build();
CatalogBaseTable table =
CatalogTable.of(
schema, "test", ImmutableList.of(), ImmutableMap.of("key", "value", "key2", "value2"));
CatalogBaseTable newTable =
CatalogTable.of(
schema, "new comment", ImmutableList.of(), ImmutableMap.of("key", "new value"));
org.apache.gravitino.rel.TableChange[] tableChanges =
BaseCatalog.getGravitinoTableChanges(table, newTable);
List<org.apache.gravitino.rel.TableChange> expected =
ImmutableList.of(org.apache.gravitino.rel.TableChange.updateComment("new comment"));
Assertions.assertArrayEquals(expected.toArray(), tableChanges);
}
}
Loading

0 comments on commit 3448dca

Please sign in to comment.