Skip to content

Commit

Permalink
update
Browse files Browse the repository at this point in the history
  • Loading branch information
linyanghao committed May 18, 2023
1 parent aafb3ce commit d4d75bc
Show file tree
Hide file tree
Showing 5 changed files with 419 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,11 @@
import org.apache.flink.table.catalog.CatalogPartitionSpec;
import org.apache.flink.table.catalog.CatalogTable;
import org.apache.flink.table.catalog.CatalogTableImpl;
import org.apache.flink.table.catalog.Column;
import org.apache.flink.table.catalog.Constraint;
import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.table.catalog.TableChange;
import org.apache.flink.table.catalog.UniqueConstraint;
import org.apache.flink.table.catalog.exceptions.CatalogException;
import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException;
import org.apache.flink.table.catalog.exceptions.DatabaseNotEmptyException;
Expand All @@ -62,6 +66,7 @@
import org.apache.iceberg.Table;
import org.apache.iceberg.Transaction;
import org.apache.iceberg.UpdateProperties;
import org.apache.iceberg.UpdateSchema;
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.catalog.SupportsNamespaces;
Expand All @@ -78,6 +83,7 @@
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.types.Type;

/**
* A Flink Catalog implementation that wraps an Iceberg {@link Catalog}.
Expand Down Expand Up @@ -517,6 +523,172 @@ public void alterTable(ObjectPath tablePath, CatalogBaseTable newTable, boolean
commitChanges(icebergTable, setLocation, setSnapshotId, pickSnapshotId, setProperties);
}

@Override
public void alterTable(
ObjectPath tablePath,
CatalogBaseTable newTable,
List<TableChange> tableChanges,
boolean ignoreIfNotExists)
throws TableNotExistException, CatalogException {
validateFlinkTable(newTable);

Table icebergTable;
try {
icebergTable = loadIcebergTable(tablePath);
} catch (TableNotExistException e) {
if (!ignoreIfNotExists) {
throw e;
} else {
return;
}
}

String setLocation = null;
String setSnapshotId = null;
String pickSnapshotId = null;

List<TableChange> propertyChanges = Lists.newArrayList();
List<TableChange> schemaChanges = Lists.newArrayList();
for (TableChange change : tableChanges) {
if (change instanceof TableChange.SetOption) {
TableChange.SetOption set = (TableChange.SetOption) change;

if ("location".equalsIgnoreCase(set.getKey())) {
setLocation = set.getValue();
} else if ("current-snapshot-id".equalsIgnoreCase(set.getKey())) {
setSnapshotId = set.getValue();
} else if ("cherry-pick-snapshot-id".equalsIgnoreCase(set.getKey())) {
pickSnapshotId = set.getValue();
} else {
propertyChanges.add(change);
}

} else if (change instanceof TableChange.ResetOption) {
propertyChanges.add(change);

} else {
schemaChanges.add(change);
}
}

commitChanges(
icebergTable, setLocation, setSnapshotId, pickSnapshotId, schemaChanges, propertyChanges);
}

private static void applySchemaChanges(
UpdateSchema updateSchema, List<TableChange> schemaChanges) {
for (TableChange change : schemaChanges) {
if (change instanceof TableChange.AddColumn) {
TableChange.AddColumn addColumn = (TableChange.AddColumn) change;
if (!FlinkCompatibilityUtil.isPhysicalColumn(addColumn.getColumn())) {
throw new UnsupportedOperationException("Adding computed columns is not supported yet.");
}
Column flinkColumn = addColumn.getColumn();
Type icebergType = FlinkSchemaUtil.convert(flinkColumn.getDataType().getLogicalType());
updateSchema.addColumn(flinkColumn.getName(), icebergType);

} else if (change instanceof TableChange.ModifyColumn) {
TableChange.ModifyColumn modifyColumn = (TableChange.ModifyColumn) change;
applyModifyColumn(updateSchema, modifyColumn);

} else if (change instanceof TableChange.DropColumn) {
TableChange.DropColumn dropColumn = (TableChange.DropColumn) change;
updateSchema.deleteColumn(dropColumn.getColumnName());

} else if (change instanceof TableChange.AddWatermark) {
throw new UnsupportedOperationException("Adding watermark specs is not supported yet. ");

} else if (change instanceof TableChange.ModifyWatermark) {
throw new UnsupportedOperationException("Modifying watermark specs is not supported yet. ");

} else if (change instanceof TableChange.DropWatermark) {
throw new UnsupportedOperationException("Watermark specs is not supported yet. ");

} else if (change instanceof TableChange.AddUniqueConstraint) {
TableChange.AddUniqueConstraint addPk = (TableChange.AddUniqueConstraint) change;
applyUniqueConstraint(updateSchema, addPk.getConstraint());

} else if (change instanceof TableChange.ModifyUniqueConstraint) {
TableChange.ModifyUniqueConstraint modifyPk = (TableChange.ModifyUniqueConstraint) change;
applyUniqueConstraint(updateSchema, modifyPk.getNewConstraint());

} else if (change instanceof TableChange.DropConstraint) {
throw new UnsupportedOperationException("Dropping constraints is not supported yet. ");

} else {
throw new UnsupportedOperationException("Cannot apply unknown table change: " + change);
}
}
}

private static void applyPropertyChanges(
UpdateProperties updateProperties, List<TableChange> propertyChanges) {
for (TableChange change : propertyChanges) {
if (change instanceof TableChange.SetOption) {
TableChange.SetOption setOption = (TableChange.SetOption) change;
updateProperties.set(setOption.getKey(), setOption.getValue());

} else if (change instanceof TableChange.ResetOption) {
TableChange.ResetOption resetOption = (TableChange.ResetOption) change;
updateProperties.remove(resetOption.getKey());

} else {
throw new UnsupportedOperationException("Cannot apply unknown table change: " + change);
}
}
}

private static void applyModifyColumn(
UpdateSchema pendingUpdate, TableChange.ModifyColumn modifyColumn) {
if (modifyColumn instanceof TableChange.ModifyColumnName) {
TableChange.ModifyColumnName modifyName = (TableChange.ModifyColumnName) modifyColumn;
pendingUpdate.renameColumn(modifyName.getOldColumnName(), modifyName.getNewColumnName());

} else if (modifyColumn instanceof TableChange.ModifyColumnPosition) {
TableChange.ModifyColumnPosition modifyPosition =
(TableChange.ModifyColumnPosition) modifyColumn;
applyModifyColumnPosition(pendingUpdate, modifyPosition);

} else if (modifyColumn instanceof TableChange.ModifyPhysicalColumnType) {
TableChange.ModifyPhysicalColumnType modifyType =
(TableChange.ModifyPhysicalColumnType) modifyColumn;
Type type = FlinkSchemaUtil.convert(modifyType.getNewType().getLogicalType());
pendingUpdate.updateColumn(modifyType.getOldColumn().getName(), type.asPrimitiveType());

} else if (modifyColumn instanceof TableChange.ModifyColumnComment) {
TableChange.ModifyColumnComment modifyComment =
(TableChange.ModifyColumnComment) modifyColumn;
pendingUpdate.updateColumnDoc(
modifyComment.getOldColumn().getName(), modifyComment.getNewComment());

} else {
throw new UnsupportedOperationException("Cannot apply unknown table change: " + modifyColumn);
}
}

private static void applyModifyColumnPosition(
UpdateSchema pendingUpdate, TableChange.ModifyColumnPosition modifyColumnPosition) {
TableChange.ColumnPosition newPosition = modifyColumnPosition.getNewPosition();
if (newPosition instanceof TableChange.First) {
pendingUpdate.moveFirst(modifyColumnPosition.getOldColumn().getName());
} else if (newPosition instanceof TableChange.After) {
TableChange.After after = (TableChange.After) newPosition;
pendingUpdate.moveAfter(modifyColumnPosition.getOldColumn().getName(), after.column());
} else {
throw new UnsupportedOperationException(
"Cannot apply unknown table change: " + modifyColumnPosition);
}
}

private static void applyUniqueConstraint(
UpdateSchema pendingUpdate, UniqueConstraint constraint) {
if (constraint.getType() == Constraint.ConstraintType.UNIQUE_KEY) {
throw new UnsupportedOperationException(
"Setting unique key constraint is not supported yet.");
}
pendingUpdate.setIdentifierFields(constraint.getColumns());
}

private static void validateFlinkTable(CatalogBaseTable table) {
Preconditions.checkArgument(
table instanceof CatalogTable, "The Table should be a CatalogTable.");
Expand Down Expand Up @@ -605,6 +777,52 @@ private static void commitChanges(
transaction.commitTransaction();
}

private static void commitChanges(
Table table,
String setLocation,
String setSnapshotId,
String pickSnapshotId,
List<TableChange> schemaChanges,
List<TableChange> propertyChanges) {
// don't allow setting the snapshot and picking a commit at the same time because order is
// ambiguous and choosing
// one order leads to different results
Preconditions.checkArgument(
setSnapshotId == null || pickSnapshotId == null,
"Cannot set the current snapshot ID and cherry-pick snapshot changes");

if (setSnapshotId != null) {
long newSnapshotId = Long.parseLong(setSnapshotId);
table.manageSnapshots().setCurrentSnapshot(newSnapshotId).commit();
}

// if updating the table snapshot, perform that update first in case it fails
if (pickSnapshotId != null) {
long newSnapshotId = Long.parseLong(pickSnapshotId);
table.manageSnapshots().cherrypick(newSnapshotId).commit();
}

Transaction transaction = table.newTransaction();

if (setLocation != null) {
transaction.updateLocation().setLocation(setLocation).commit();
}

if (!schemaChanges.isEmpty()) {
UpdateSchema updateSchema = transaction.updateSchema();
applySchemaChanges(updateSchema, schemaChanges);
updateSchema.commit();
}

if (!propertyChanges.isEmpty()) {
UpdateProperties updateProperties = transaction.updateProperties();
applyPropertyChanges(updateProperties, propertyChanges);
updateProperties.commit();
}

transaction.commitTransaction();
}

static CatalogTable toCatalogTable(Table table) {
TableSchema schema = FlinkSchemaUtil.toSchema(table.schema());
List<String> partitionKeys = toPartitionKeys(table.spec(), table.schema());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,16 @@ public static LogicalType convert(Type type) {
return TypeUtil.visit(type, new TypeToFlinkType());
}

/**
* Convert a {@link LogicalType Flink type} to a {@link Type}.
*
* @param flinkType a FlinkType
* @return the equivalent Iceberg type
*/
public static Type convert(LogicalType flinkType) {
return flinkType.accept(new FlinkTypeToType());
}

/**
* Convert a {@link RowType} to a {@link TableSchema}.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,10 @@ class FlinkTypeToType extends FlinkTypeVisitor<Type> {
private final RowType root;
private int nextId;

FlinkTypeToType() {
this.root = null;
}

FlinkTypeToType(RowType root) {
this.root = root;
// the root struct's fields use the first ids
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.table.api.TableColumn;
import org.apache.flink.table.catalog.Column;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
import org.apache.flink.table.types.logical.RowType;
Expand All @@ -39,4 +40,8 @@ public static TypeInformation<RowData> toTypeInfo(RowType rowType) {
public static boolean isPhysicalColumn(TableColumn column) {
return column.isPhysical();
}

public static boolean isPhysicalColumn(Column column) {
return column.isPhysical();
}
}
Loading

0 comments on commit d4d75bc

Please sign in to comment.