Skip to content

Commit

Permalink
[apache#3371] feat(flink-connector): support table operation
Browse files Browse the repository at this point in the history
  • Loading branch information
coolderli committed Jun 27, 2024
1 parent f5f092f commit 9ced329
Show file tree
Hide file tree
Showing 11 changed files with 989 additions and 188 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -48,12 +48,32 @@ default Map<String, String> toGravitinoSchemaProperties(Map<String, String> flin
}

/**
* Converts properties from Gravitino schema properties to Flink connector schema properties.
* Converts properties from Flink connector table properties to Gravitino table properties.
*
* @param flinkProperties The table properties provided by Flink.
* @return The table properties for the Gravitino.
*/
default Map<String, String> toGravitinoTableProperties(Map<String, String> flinkProperties) {
return flinkProperties;
}

/**
* Converts properties from Gravitino schema properties to Flink connector database properties.
*
* @param gravitinoProperties The schema properties provided by Gravitino.
* @return The schema properties for the Flink connector.
*/
default Map<String, String> toFlinkSchemaProperties(Map<String, String> gravitinoProperties) {
default Map<String, String> toFlinkDatabaseProperties(Map<String, String> gravitinoProperties) {
return gravitinoProperties;
}

/**
* Converts properties from Gravitino table properties to Flink connector table properties.
*
* @param gravitinoProperties The table properties provided by Gravitino.
* @return The table properties for the Flink connector.
*/
default Map<String, String> toFlinkTableProperties(Map<String, String> gravitinoProperties) {
return gravitinoProperties;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,19 +6,29 @@
package com.datastrato.gravitino.flink.connector.catalog;

import com.datastrato.gravitino.Catalog;
import com.datastrato.gravitino.NameIdentifier;
import com.datastrato.gravitino.Namespace;
import com.datastrato.gravitino.Schema;
import com.datastrato.gravitino.SchemaChange;
import com.datastrato.gravitino.exceptions.NoSuchCatalogException;
import com.datastrato.gravitino.exceptions.NoSuchSchemaException;
import com.datastrato.gravitino.exceptions.NonEmptySchemaException;
import com.datastrato.gravitino.exceptions.SchemaAlreadyExistsException;
import com.datastrato.gravitino.exceptions.TableAlreadyExistsException;
import com.datastrato.gravitino.flink.connector.PropertiesConverter;
import com.datastrato.gravitino.flink.connector.utils.TypeUtils;
import com.datastrato.gravitino.rel.Column;
import com.datastrato.gravitino.rel.Table;
import com.datastrato.gravitino.rel.TableChange;
import com.datastrato.gravitino.rel.expressions.transforms.Transform;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.MapDifference;
import com.google.common.collect.Maps;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.commons.compress.utils.Lists;
import org.apache.flink.table.catalog.AbstractCatalog;
import org.apache.flink.table.catalog.CatalogBaseTable;
Expand All @@ -27,7 +37,9 @@
import org.apache.flink.table.catalog.CatalogFunction;
import org.apache.flink.table.catalog.CatalogPartition;
import org.apache.flink.table.catalog.CatalogPartitionSpec;
import org.apache.flink.table.catalog.CatalogTable;
import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.table.catalog.ResolvedCatalogBaseTable;
import org.apache.flink.table.catalog.exceptions.CatalogException;
import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException;
import org.apache.flink.table.catalog.exceptions.DatabaseNotEmptyException;
Expand Down Expand Up @@ -74,10 +86,10 @@ public CatalogDatabase getDatabase(String databaseName)
try {
Schema schema = catalog().asSchemas().loadSchema(databaseName);
Map<String, String> properties =
propertiesConverter.toFlinkSchemaProperties(schema.properties());
propertiesConverter.toFlinkDatabaseProperties(schema.properties());
return new CatalogDatabaseImpl(properties, schema.comment());
} catch (NoSuchSchemaException e) {
throw new DatabaseNotExistException(getName(), databaseName);
throw new DatabaseNotExistException(catalogName(), databaseName);
}
}

Expand All @@ -96,7 +108,7 @@ public void createDatabase(
catalog().asSchemas().createSchema(databaseName, catalogDatabase.getComment(), properties);
} catch (SchemaAlreadyExistsException e) {
if (!ignoreIfExists) {
throw new DatabaseAlreadyExistException(getName(), databaseName);
throw new DatabaseAlreadyExistException(catalogName(), databaseName);
}
} catch (NoSuchCatalogException e) {
throw new CatalogException(e);
Expand All @@ -109,10 +121,10 @@ public void dropDatabase(String databaseName, boolean ignoreIfNotExists, boolean
try {
boolean dropped = catalog().asSchemas().dropSchema(databaseName, cascade);
if (!dropped && !ignoreIfNotExists) {
throw new DatabaseNotExistException(getName(), databaseName);
throw new DatabaseNotExistException(catalogName(), databaseName);
}
} catch (NonEmptySchemaException e) {
throw new DatabaseNotEmptyException(getName(), databaseName);
throw new DatabaseNotEmptyException(catalogName(), databaseName);
} catch (NoSuchCatalogException e) {
throw new CatalogException(e);
}
Expand All @@ -127,16 +139,26 @@ public void alterDatabase(
catalog().asSchemas().alterSchema(databaseName, schemaChanges);
} catch (NoSuchSchemaException e) {
if (!ignoreIfNotExists) {
throw new DatabaseNotExistException(getName(), databaseName);
throw new DatabaseNotExistException(catalogName(), databaseName);
}
} catch (NoSuchCatalogException e) {
throw new CatalogException(e);
}
}

@Override
public List<String> listTables(String s) throws DatabaseNotExistException, CatalogException {
throw new UnsupportedOperationException();
public List<String> listTables(String databaseName)
throws DatabaseNotExistException, CatalogException {
try {
return Stream.of(
catalog()
.asTableCatalog()
.listTables(Namespace.of(metalakeName(), catalogName(), databaseName)))
.map(NameIdentifier::name)
.collect(Collectors.toList());
} catch (NoSuchSchemaException e) {
throw new DatabaseNotExistException(catalogName(), databaseName);
}
}

@Override
Expand All @@ -145,36 +167,112 @@ public List<String> listViews(String s) throws DatabaseNotExistException, Catalo
}

@Override
public CatalogBaseTable getTable(ObjectPath objectPath)
public CatalogBaseTable getTable(ObjectPath tablePath)
throws TableNotExistException, CatalogException {
throw new UnsupportedOperationException();
try {
Table table =
catalog()
.asTableCatalog()
.loadTable(
NameIdentifier.of(
metalakeName(),
catalogName(),
tablePath.getDatabaseName(),
tablePath.getObjectName()));
return toFlinkTable(table);
} catch (NoSuchCatalogException e) {
throw new CatalogException(e);
}
}

@Override
public boolean tableExists(ObjectPath objectPath) throws CatalogException {
throw new UnsupportedOperationException();
public boolean tableExists(ObjectPath tablePath) throws CatalogException {
return catalog()
.asTableCatalog()
.tableExists(
NameIdentifier.of(
metalakeName(),
catalogName(),
tablePath.getDatabaseName(),
tablePath.getObjectName()));
}

@Override
public void dropTable(ObjectPath objectPath, boolean b)
public void dropTable(ObjectPath tablePath, boolean ignoreIfNotExists)
throws TableNotExistException, CatalogException {
throw new UnsupportedOperationException();
boolean dropped =
catalog()
.asTableCatalog()
.dropTable(
NameIdentifier.of(
metalakeName(),
catalogName(),
tablePath.getDatabaseName(),
tablePath.getObjectName()));
if (!dropped && !ignoreIfNotExists) {
throw new TableNotExistException(catalogName(), tablePath);
}
}

@Override
public void renameTable(ObjectPath objectPath, String s, boolean b)
public void renameTable(ObjectPath tablePath, String newTableName, boolean ignoreIfNotExists)
throws TableNotExistException, TableAlreadyExistException, CatalogException {
throw new UnsupportedOperationException();
NameIdentifier identifier =
NameIdentifier.of(
Namespace.of(metalakeName(), catalogName(), tablePath.getDatabaseName()), newTableName);

if (catalog().asTableCatalog().tableExists(identifier)) {
throw new TableAlreadyExistException(
catalogName(), ObjectPath.fromString(tablePath.getDatabaseName() + newTableName));
}

try {
catalog()
.asTableCatalog()
.alterTable(
NameIdentifier.of(
metalakeName(),
catalogName(),
tablePath.getDatabaseName(),
tablePath.getObjectName()),
TableChange.rename(newTableName));
} catch (NoSuchCatalogException e) {
if (!ignoreIfNotExists) {
throw new TableNotExistException(catalogName(), tablePath);
}
}
}

@Override
public void createTable(ObjectPath objectPath, CatalogBaseTable catalogBaseTable, boolean b)
public void createTable(ObjectPath tablePath, CatalogBaseTable table, boolean ignoreIfExists)
throws TableAlreadyExistException, DatabaseNotExistException, CatalogException {
throw new UnsupportedOperationException();
NameIdentifier identifier =
NameIdentifier.of(
metalakeName(), catalogName(), tablePath.getDatabaseName(), tablePath.getObjectName());

ResolvedCatalogBaseTable<?> resolvedTable = (ResolvedCatalogBaseTable<?>) table;
Column[] columns =
resolvedTable.getResolvedSchema().getColumns().stream()
.map(this::toGravitinoColumn)
.toArray(Column[]::new);
String comment = table.getComment();
Map<String, String> properties =
propertiesConverter.toGravitinoTableProperties(table.getOptions());
try {
catalog().asTableCatalog().createTable(identifier, columns, comment, properties);
} catch (NoSuchSchemaException e) {
throw new DatabaseNotExistException(catalogName(), tablePath.getDatabaseName(), e);
} catch (TableAlreadyExistsException e) {
if (!ignoreIfExists) {
throw new TableAlreadyExistException(catalogName(), tablePath, e);
}
} catch (Exception e) {
throw new CatalogException(e);
}
}

@Override
public void alterTable(ObjectPath objectPath, CatalogBaseTable catalogBaseTable, boolean b)
public void alterTable(ObjectPath tablePath, CatalogBaseTable newTable, boolean ignoreIfNotExists)
throws TableNotExistException, CatalogException {
throw new UnsupportedOperationException();
}
Expand Down Expand Up @@ -337,6 +435,31 @@ public void alterPartitionColumnStatistics(

protected abstract PropertiesConverter getPropertiesConverter();

protected CatalogBaseTable toFlinkTable(Table table) {

org.apache.flink.table.api.Schema.Builder builder =
org.apache.flink.table.api.Schema.newBuilder();
for (Column column : table.columns()) {
builder
.column(column.name(), TypeUtils.toFlinkType(column.dataType()))
.withComment(column.comment());
}

List<String> partitionKeys =
Arrays.stream(table.partitioning()).map(Transform::name).collect(Collectors.toList());
return CatalogTable.of(builder.build(), table.comment(), partitionKeys, table.properties());
}

private Column toGravitinoColumn(org.apache.flink.table.catalog.Column column) {
return Column.of(
column.getName(),
TypeUtils.toGravitinoType(column.getDataType().getLogicalType()),
column.getComment().orElse(null),
column.getDataType().getLogicalType().isNullable(),
false,
null);
}

@VisibleForTesting
static SchemaChange[] getSchemaChange(CatalogDatabase current, CatalogDatabase updated) {
Map<String, String> currentProperties = current.getProperties();
Expand All @@ -363,4 +486,12 @@ static SchemaChange[] getSchemaChange(CatalogDatabase current, CatalogDatabase u
private Catalog catalog() {
return GravitinoCatalogManager.get().getGravitinoCatalogInfo(getName());
}

private String catalogName() {
return getName();
}

private String metalakeName() {
return GravitinoCatalogManager.get().getMetalakeName();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -64,9 +64,11 @@ public static GravitinoCatalogManager get() {
* <p>After close, GravitinoCatalogManager can not be used anymore.
*/
public void close() {
Preconditions.checkState(!isClosed, "Gravitino Catalog is already closed");
isClosed = true;
gravitinoClient.close();
if (!isClosed) {
isClosed = true;
gravitinoClient.close();
gravitinoCatalogManager = null;
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,12 @@
package com.datastrato.gravitino.flink.connector.hive;

import com.datastrato.gravitino.catalog.hive.HiveCatalogPropertiesMeta;
import com.datastrato.gravitino.catalog.hive.HiveTablePropertiesMetadata;
import com.datastrato.gravitino.flink.connector.PropertiesConverter;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.table.catalog.CommonCatalogOptions;
import org.apache.hadoop.hive.conf.HiveConf;
Expand Down Expand Up @@ -62,4 +64,22 @@ public Map<String, String> toFlinkCatalogProperties(Map<String, String> gravitin
});
return flinkCatalogProperties;
}

@Override
public Map<String, String> toFlinkDatabaseProperties(Map<String, String> gravitinoProperties) {
return gravitinoProperties.entrySet().stream()
.collect(
Collectors.toMap(
entry -> {
String key = entry.getKey();
if (key.startsWith(HiveTablePropertiesMetadata.SERDE_PARAMETER_PREFIX)) {
return key.substring(
HiveTablePropertiesMetadata.SERDE_PARAMETER_PREFIX.length());
} else {
return key;
}
},
Map.Entry::getValue,
(existingValue, newValue) -> newValue));
}
}
Loading

0 comments on commit 9ced329

Please sign in to comment.