Skip to content

Commit

Permalink
[#3916] feat(catalog-lakehouse-paimon): Support alter table for Paimo…
Browse files Browse the repository at this point in the history
…n Catalog (#4428)

### What changes were proposed in this pull request?

Support alter table for Paimon Catalog.

### Why are the changes needed?

Fix: #3916

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?
New UTs and ITs.

Co-authored-by: cai can <[email protected]>
Co-authored-by: caican <[email protected]>
  • Loading branch information
3 people authored Aug 8, 2024
1 parent 6279630 commit 9b243bf
Show file tree
Hide file tree
Showing 8 changed files with 1,011 additions and 40 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,10 @@
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;
import org.apache.commons.lang3.ArrayUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.gravitino.NameIdentifier;
import org.apache.gravitino.Namespace;
import org.apache.gravitino.SchemaChange;
Expand All @@ -43,6 +46,7 @@
import org.apache.gravitino.connector.SupportsSchemas;
import org.apache.gravitino.exceptions.ConnectionFailedException;
import org.apache.gravitino.exceptions.NoSuchCatalogException;
import org.apache.gravitino.exceptions.NoSuchColumnException;
import org.apache.gravitino.exceptions.NoSuchSchemaException;
import org.apache.gravitino.exceptions.NoSuchTableException;
import org.apache.gravitino.exceptions.NonEmptySchemaException;
Expand All @@ -52,6 +56,7 @@
import org.apache.gravitino.rel.Column;
import org.apache.gravitino.rel.TableCatalog;
import org.apache.gravitino.rel.TableChange;
import org.apache.gravitino.rel.TableChange.RenameTable;
import org.apache.gravitino.rel.expressions.NamedReference;
import org.apache.gravitino.rel.expressions.distributions.Distribution;
import org.apache.gravitino.rel.expressions.distributions.Distributions;
Expand Down Expand Up @@ -84,6 +89,8 @@ public class PaimonCatalogOperations implements CatalogOperations, SupportsSchem
"Paimon schema (database) %s already exists.";
private static final String NO_SUCH_TABLE_EXCEPTION = "Paimon table %s does not exist.";
private static final String TABLE_ALREADY_EXISTS_EXCEPTION = "Paimon table %s already exists.";
private static final String NO_SUCH_COLUMN_EXCEPTION =
"Paimon column of table %s does not exist.";

/**
* Initializes the Paimon catalog operations with the provided configuration.
Expand Down Expand Up @@ -402,7 +409,25 @@ public GravitinoPaimonTable createTable(
@Override
public GravitinoPaimonTable alterTable(NameIdentifier identifier, TableChange... changes)
throws NoSuchTableException, IllegalArgumentException {
throw new UnsupportedOperationException("alterTable is unsupported now for Paimon Catalog.");
Optional<TableChange> renameTableOpt =
Arrays.stream(changes)
.filter(tableChange -> tableChange instanceof RenameTable)
.reduce((a, b) -> b);
if (renameTableOpt.isPresent()) {
String otherChanges =
Arrays.stream(changes)
.filter(tableChange -> !(tableChange instanceof RenameTable))
.map(String::valueOf)
.collect(Collectors.joining("\n"));
Preconditions.checkArgument(
StringUtils.isEmpty(otherChanges),
String.format(
"The operation to change the table name cannot be performed together with other operations. "
+ "The list of operations that you cannot perform includes: \n%s",
otherChanges));
return renameTable(identifier, (RenameTable) renameTableOpt.get());
}
return internalAlterTable(identifier, changes);
}

/**
Expand Down Expand Up @@ -470,4 +495,55 @@ private void checkPaimonIndexes(Index[] indexes) {
index.type() == Index.IndexType.PRIMARY_KEY,
"Paimon only supports primary key Index."));
}

/**
* Performs rename table change with the provided identifier.
*
* @param identifier The identifier of the table to rename.
* @param renameTable Table Change to modify the table name.
* @return The renamed {@link GravitinoPaimonTable} instance.
* @throws NoSuchTableException If the table with the provided identifier does not exist.
* @throws IllegalArgumentException This exception will not be thrown in this method.
*/
private GravitinoPaimonTable renameTable(
NameIdentifier identifier, TableChange.RenameTable renameTable)
throws NoSuchTableException, IllegalArgumentException {
NameIdentifier newNnameIdentifier =
NameIdentifier.of(identifier.namespace(), renameTable.getNewName());
NameIdentifier oldIdentifier = buildPaimonNameIdentifier(identifier);
NameIdentifier newIdentifier = buildPaimonNameIdentifier(newNnameIdentifier);
try {
paimonCatalogOps.renameTable(oldIdentifier.toString(), newIdentifier.toString());
} catch (Catalog.TableNotExistException e) {
throw new NoSuchTableException(e, NO_SUCH_TABLE_EXCEPTION, oldIdentifier);
} catch (Catalog.TableAlreadyExistException e) {
throw new TableAlreadyExistsException(e, TABLE_ALREADY_EXISTS_EXCEPTION, newIdentifier);
}
return loadTable(newNnameIdentifier);
}

/**
* Performs alter table changes with the provided identifier according to the specified {@link
* TableChange} changes.
*
* @param identifier The identifier of the table to alter.
* @param changes The changes to apply to the table.
* @return The altered {@link GravitinoPaimonTable} instance.
* @throws NoSuchTableException If the table with the provided identifier does not exist.
* @throws IllegalArgumentException This exception will not be thrown in this method.
*/
private GravitinoPaimonTable internalAlterTable(NameIdentifier identifier, TableChange... changes)
throws NoSuchTableException, IllegalArgumentException {
NameIdentifier paimonNameIdentifier = buildPaimonNameIdentifier(identifier);
try {
paimonCatalogOps.alterTable(paimonNameIdentifier.toString(), changes);
} catch (Catalog.TableNotExistException e) {
throw new NoSuchTableException(e, NO_SUCH_TABLE_EXCEPTION, paimonNameIdentifier);
} catch (Catalog.ColumnNotExistException e) {
throw new NoSuchColumnException(e, NO_SUCH_COLUMN_EXCEPTION, paimonNameIdentifier);
} catch (Exception e) {
throw new RuntimeException(e);
}
return loadTable(identifier);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,15 +35,27 @@
public class PaimonTablePropertiesMetadata extends BasePropertiesMetadata {

public static final String COMMENT = "comment";
public static final String CREATOR = "creator";
public static final String OWNER = "owner";
public static final String BUCKET_KEY = "bucket-key";
public static final String MERGE_ENGINE = "merge-engine";
public static final String SEQUENCE_FIELD = "sequence.field";
public static final String ROWKIND_FIELD = "rowkind.field";
public static final String PRIMARY_KEY = "primary-key";
public static final String PARTITION = "partition";

private static final Map<String, PropertyEntry<?>> PROPERTIES_METADATA;

static {
List<PropertyEntry<?>> propertyEntries =
ImmutableList.of(
stringReservedPropertyEntry(COMMENT, "The table comment", true),
stringReservedPropertyEntry(CREATOR, "The table creator", false));
stringReservedPropertyEntry(OWNER, "The table owner", false),
stringReservedPropertyEntry(BUCKET_KEY, "The table bucket key", false),
stringReservedPropertyEntry(MERGE_ENGINE, "The table merge engine", false),
stringReservedPropertyEntry(SEQUENCE_FIELD, "The table sequence field", false),
stringReservedPropertyEntry(ROWKIND_FIELD, "The table rowkind field", false),
stringReservedPropertyEntry(PRIMARY_KEY, "The table primary key", false),
stringReservedPropertyEntry(PARTITION, "The table partition", false));
PROPERTIES_METADATA = Maps.uniqueIndex(propertyEntries, PropertyEntry::getName);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,20 @@
package org.apache.gravitino.catalog.lakehouse.paimon.ops;

import static org.apache.gravitino.catalog.lakehouse.paimon.utils.CatalogUtils.loadCatalogBackend;
import static org.apache.gravitino.catalog.lakehouse.paimon.utils.TableOpsUtils.buildSchemaChanges;

import com.google.common.base.Preconditions;
import java.util.List;
import java.util.Map;
import org.apache.gravitino.catalog.lakehouse.paimon.PaimonConfig;
import org.apache.gravitino.rel.TableChange;
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.Catalog.ColumnAlreadyExistException;
import org.apache.paimon.catalog.Catalog.ColumnNotExistException;
import org.apache.paimon.catalog.Catalog.DatabaseAlreadyExistException;
import org.apache.paimon.catalog.Catalog.DatabaseNotEmptyException;
import org.apache.paimon.catalog.Catalog.DatabaseNotExistException;
import org.apache.paimon.catalog.Catalog.TableAlreadyExistException;
import org.apache.paimon.catalog.Catalog.TableNotExistException;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.schema.Schema;
Expand Down Expand Up @@ -89,6 +94,16 @@ public void dropTable(String tableName) throws TableNotExistException {
catalog.dropTable(tableIdentifier(tableName), false);
}

public void alterTable(String tableName, TableChange... changes)
throws ColumnAlreadyExistException, TableNotExistException, ColumnNotExistException {
catalog.alterTable(tableIdentifier(tableName), buildSchemaChanges(changes), false);
}

public void renameTable(String fromTableName, String toTableName)
throws TableNotExistException, TableAlreadyExistException {
catalog.renameTable(tableIdentifier(fromTableName), tableIdentifier(toTableName), false);
}

private Identifier tableIdentifier(String tableName) {
return Identifier.fromString(tableName);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,20 +18,115 @@
*/
package org.apache.gravitino.catalog.lakehouse.paimon.utils;

import static org.apache.gravitino.catalog.lakehouse.paimon.utils.TypeUtils.toPaimonType;
import static org.apache.paimon.schema.SchemaChange.addColumn;
import static org.apache.paimon.schema.SchemaChange.dropColumn;
import static org.apache.paimon.schema.SchemaChange.removeOption;
import static org.apache.paimon.schema.SchemaChange.renameColumn;
import static org.apache.paimon.schema.SchemaChange.setOption;
import static org.apache.paimon.schema.SchemaChange.updateColumnComment;
import static org.apache.paimon.schema.SchemaChange.updateColumnNullability;
import static org.apache.paimon.schema.SchemaChange.updateColumnPosition;
import static org.apache.paimon.schema.SchemaChange.updateColumnType;
import static org.apache.paimon.schema.SchemaChange.updateComment;

import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
import java.util.ArrayList;
import java.util.List;
import org.apache.gravitino.catalog.lakehouse.paimon.ops.PaimonCatalogOps;
import org.apache.gravitino.rel.Column;
import org.apache.gravitino.rel.TableChange;
import org.apache.gravitino.rel.TableChange.AddColumn;
import org.apache.gravitino.rel.TableChange.After;
import org.apache.gravitino.rel.TableChange.ColumnChange;
import org.apache.gravitino.rel.TableChange.ColumnPosition;
import org.apache.gravitino.rel.TableChange.Default;
import org.apache.gravitino.rel.TableChange.DeleteColumn;
import org.apache.gravitino.rel.TableChange.First;
import org.apache.gravitino.rel.TableChange.RemoveProperty;
import org.apache.gravitino.rel.TableChange.RenameColumn;
import org.apache.gravitino.rel.TableChange.SetProperty;
import org.apache.gravitino.rel.TableChange.UpdateColumnComment;
import org.apache.gravitino.rel.TableChange.UpdateColumnNullability;
import org.apache.gravitino.rel.TableChange.UpdateColumnPosition;
import org.apache.gravitino.rel.TableChange.UpdateColumnType;
import org.apache.gravitino.rel.TableChange.UpdateComment;
import org.apache.gravitino.rel.expressions.Expression;
import org.apache.paimon.schema.SchemaChange;
import org.apache.paimon.schema.SchemaChange.Move;

/** Utilities of {@link PaimonCatalogOps} to support table operation. */
public class TableOpsUtils {

public static final Joiner DOT = Joiner.on(".");

public static void checkColumnCapability(
String fieldName, Expression defaultValue, boolean autoIncrement) {
checkColumnDefaultValue(fieldName, defaultValue);
checkColumnAutoIncrement(fieldName, autoIncrement);
}

public static List<SchemaChange> buildSchemaChanges(TableChange... tableChanges)
throws UnsupportedOperationException {
List<SchemaChange> schemaChanges = new ArrayList<>();
for (TableChange tableChange : tableChanges) {
schemaChanges.add(buildSchemaChange(tableChange));
}
return schemaChanges;
}

public static SchemaChange buildSchemaChange(TableChange tableChange)
throws UnsupportedOperationException {
if (tableChange instanceof ColumnChange) {
if (tableChange instanceof AddColumn) {
AddColumn addColumn = (AddColumn) tableChange;
String fieldName = getfieldName(addColumn);
checkColumnCapability(fieldName, addColumn.getDefaultValue(), addColumn.isAutoIncrement());
return addColumn(
fieldName,
toPaimonType(addColumn.getDataType()).copy(addColumn.isNullable()),
addColumn.getComment(),
move(fieldName, addColumn.getPosition()));
} else if (tableChange instanceof DeleteColumn) {
return dropColumn(getfieldName((DeleteColumn) tableChange));
} else if (tableChange instanceof RenameColumn) {
RenameColumn renameColumn = ((RenameColumn) tableChange);
return renameColumn(getfieldName(renameColumn), renameColumn.getNewName());
} else if (tableChange instanceof UpdateColumnComment) {
UpdateColumnComment updateColumnComment = (UpdateColumnComment) tableChange;
return updateColumnComment(
getfieldName(updateColumnComment), updateColumnComment.getNewComment());
} else if (tableChange instanceof UpdateColumnNullability) {
UpdateColumnNullability updateColumnNullability = (UpdateColumnNullability) tableChange;
return updateColumnNullability(
getfieldName(updateColumnNullability), updateColumnNullability.nullable());
} else if (tableChange instanceof UpdateColumnPosition) {
UpdateColumnPosition updateColumnPosition = (UpdateColumnPosition) tableChange;
Preconditions.checkArgument(
!(updateColumnPosition.getPosition() instanceof Default),
"Default position is not supported for Paimon update column position.");
return updateColumnPosition(
move(getfieldName(updateColumnPosition), updateColumnPosition.getPosition()));
} else if (tableChange instanceof UpdateColumnType) {
UpdateColumnType updateColumnType = (UpdateColumnType) tableChange;
return updateColumnType(
getfieldName(updateColumnType), toPaimonType(updateColumnType.getNewDataType()));
}
} else if (tableChange instanceof UpdateComment) {
return updateComment(((UpdateComment) tableChange).getNewComment());
} else if (tableChange instanceof SetProperty) {
SetProperty setProperty = ((SetProperty) tableChange);
return setOption(setProperty.getProperty(), setProperty.getValue());
} else if (tableChange instanceof RemoveProperty) {
RemoveProperty removeProperty = (RemoveProperty) tableChange;
return removeOption(removeProperty.getProperty());
}
throw new UnsupportedOperationException(
String.format(
"Paimon does not support %s table change.", tableChange.getClass().getSimpleName()));
}

private static void checkColumnDefaultValue(String fieldName, Expression defaultValue) {
Preconditions.checkArgument(
defaultValue.equals(Column.DEFAULT_VALUE_NOT_SET),
Expand All @@ -46,4 +141,39 @@ private static void checkColumnAutoIncrement(String fieldName, boolean autoIncre
String.format(
"Paimon does not support auto increment column. Illegal column: %s.", fieldName));
}

private static void checkNestedColumn(String[] fieldNames) {
Preconditions.checkArgument(
fieldNames.length == 1,
String.format(
"Paimon does not support update nested column. Illegal column: %s.",
getfieldName(fieldNames)));
}

public static String[] getfieldName(String fieldName) {
return new String[] {fieldName};
}

public static String getfieldName(String[] fieldName) {
return DOT.join(fieldName);
}

private static String getfieldName(ColumnChange columnChange) {
return getfieldName(columnChange.fieldName());
}

private static Move move(String fieldName, ColumnPosition columnPosition)
throws UnsupportedOperationException {
if (columnPosition instanceof After) {
return Move.after(fieldName, ((After) columnPosition).getColumn());
} else if (columnPosition instanceof Default) {
return null;
} else if (columnPosition instanceof First) {
return Move.first(fieldName);
}
throw new UnsupportedOperationException(
String.format(
"Paimon does not support %s column position.",
columnPosition.getClass().getSimpleName()));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -384,7 +384,13 @@ void testTableProperty() {
initBackendCatalogProperties(), entity.toCatalogInfo(), PAIMON_PROPERTIES_METADATA);
Map<String, String> map = Maps.newHashMap();
map.put(PaimonTablePropertiesMetadata.COMMENT, "test");
map.put(PaimonTablePropertiesMetadata.CREATOR, "test");
map.put(PaimonTablePropertiesMetadata.OWNER, "test");
map.put(PaimonTablePropertiesMetadata.BUCKET_KEY, "test");
map.put(PaimonTablePropertiesMetadata.MERGE_ENGINE, "test");
map.put(PaimonTablePropertiesMetadata.SEQUENCE_FIELD, "test");
map.put(PaimonTablePropertiesMetadata.ROWKIND_FIELD, "test");
map.put(PaimonTablePropertiesMetadata.PRIMARY_KEY, "test");
map.put(PaimonTablePropertiesMetadata.PARTITION, "test");
for (Map.Entry<String, String> entry : map.entrySet()) {
HashMap<String, String> properties =
new HashMap<String, String>() {
Expand Down
Loading

0 comments on commit 9b243bf

Please sign in to comment.