Skip to content

Commit

Permalink
[#3371] feat(flink-connector): support basic table operation
Browse files Browse the repository at this point in the history
  • Loading branch information
coolderli committed Jul 2, 2024
1 parent d5d3fa2 commit 064dcc1
Show file tree
Hide file tree
Showing 7 changed files with 455 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -48,12 +48,32 @@ default Map<String, String> toGravitinoSchemaProperties(Map<String, String> flin
}

/**
* Converts properties from Gravitino database properties to Flink connector schema properties.
* Converts properties from Flink connector table properties to Gravitino table properties.
*
* @param flinkProperties The table properties provided by Flink.
* @return The table properties for the Gravitino.
*/
default Map<String, String> toGravitinoTableProperties(Map<String, String> flinkProperties) {
return flinkProperties;
}

/**
* Converts properties from Gravitino schema properties to Flink connector database properties.
*
* @param gravitinoProperties The schema properties provided by Gravitino.
* @return The database properties for the Flink connector.
*/
default Map<String, String> toFlinkDatabaseProperties(Map<String, String> gravitinoProperties) {
return gravitinoProperties;
}

/**
* Converts properties from Gravitino table properties to Flink connector table properties.
*
* @param gravitinoProperties The table properties provided by Gravitino.
* @return The table properties for the Flink connector.
*/
default Map<String, String> toFlinkTableProperties(Map<String, String> gravitinoProperties) {
return gravitinoProperties;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,19 +6,29 @@
package com.datastrato.gravitino.flink.connector.catalog;

import com.datastrato.gravitino.Catalog;
import com.datastrato.gravitino.NameIdentifier;
import com.datastrato.gravitino.Namespace;
import com.datastrato.gravitino.Schema;
import com.datastrato.gravitino.SchemaChange;
import com.datastrato.gravitino.exceptions.NoSuchCatalogException;
import com.datastrato.gravitino.exceptions.NoSuchSchemaException;
import com.datastrato.gravitino.exceptions.NonEmptySchemaException;
import com.datastrato.gravitino.exceptions.SchemaAlreadyExistsException;
import com.datastrato.gravitino.exceptions.TableAlreadyExistsException;
import com.datastrato.gravitino.flink.connector.PropertiesConverter;
import com.datastrato.gravitino.flink.connector.utils.TypeUtils;
import com.datastrato.gravitino.rel.Column;
import com.datastrato.gravitino.rel.Table;
import com.datastrato.gravitino.rel.TableChange;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.MapDifference;
import com.google.common.collect.Maps;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.commons.compress.utils.Lists;
import org.apache.flink.table.catalog.AbstractCatalog;
import org.apache.flink.table.catalog.CatalogBaseTable;
Expand All @@ -27,7 +37,9 @@
import org.apache.flink.table.catalog.CatalogFunction;
import org.apache.flink.table.catalog.CatalogPartition;
import org.apache.flink.table.catalog.CatalogPartitionSpec;
import org.apache.flink.table.catalog.CatalogTable;
import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.table.catalog.ResolvedCatalogBaseTable;
import org.apache.flink.table.catalog.exceptions.CatalogException;
import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException;
import org.apache.flink.table.catalog.exceptions.DatabaseNotEmptyException;
Expand Down Expand Up @@ -135,8 +147,18 @@ public void alterDatabase(
}

@Override
public List<String> listTables(String s) throws DatabaseNotExistException, CatalogException {
throw new UnsupportedOperationException();
public List<String> listTables(String databaseName)
throws DatabaseNotExistException, CatalogException {
try {
return Stream.of(
catalog()
.asTableCatalog()
.listTables(Namespace.of(metalakeName(), catalogName(), databaseName)))
.map(NameIdentifier::name)
.collect(Collectors.toList());
} catch (NoSuchSchemaException e) {
throw new DatabaseNotExistException(catalogName(), databaseName);
}
}

@Override
Expand All @@ -145,32 +167,108 @@ public List<String> listViews(String s) throws DatabaseNotExistException, Catalo
}

@Override
public CatalogBaseTable getTable(ObjectPath objectPath)
public CatalogBaseTable getTable(ObjectPath tablePath)
throws TableNotExistException, CatalogException {
throw new UnsupportedOperationException();
try {
Table table =
catalog()
.asTableCatalog()
.loadTable(
NameIdentifier.of(
metalakeName(),
catalogName(),
tablePath.getDatabaseName(),
tablePath.getObjectName()));
return toFlinkTable(table);
} catch (NoSuchCatalogException e) {
throw new CatalogException(e);
}
}

@Override
public boolean tableExists(ObjectPath objectPath) throws CatalogException {
throw new UnsupportedOperationException();
public boolean tableExists(ObjectPath tablePath) throws CatalogException {
return catalog()
.asTableCatalog()
.tableExists(
NameIdentifier.of(
metalakeName(),
catalogName(),
tablePath.getDatabaseName(),
tablePath.getObjectName()));
}

@Override
public void dropTable(ObjectPath objectPath, boolean b)
public void dropTable(ObjectPath tablePath, boolean ignoreIfNotExists)
throws TableNotExistException, CatalogException {
throw new UnsupportedOperationException();
boolean dropped =
catalog()
.asTableCatalog()
.dropTable(
NameIdentifier.of(
metalakeName(),
catalogName(),
tablePath.getDatabaseName(),
tablePath.getObjectName()));
if (!dropped && !ignoreIfNotExists) {
throw new TableNotExistException(catalogName(), tablePath);
}
}

@Override
public void renameTable(ObjectPath objectPath, String s, boolean b)
public void renameTable(ObjectPath tablePath, String newTableName, boolean ignoreIfNotExists)
throws TableNotExistException, TableAlreadyExistException, CatalogException {
throw new UnsupportedOperationException();
NameIdentifier identifier =
NameIdentifier.of(
Namespace.of(metalakeName(), catalogName(), tablePath.getDatabaseName()), newTableName);

if (catalog().asTableCatalog().tableExists(identifier)) {
throw new TableAlreadyExistException(
catalogName(), ObjectPath.fromString(tablePath.getDatabaseName() + newTableName));
}

try {
catalog()
.asTableCatalog()
.alterTable(
NameIdentifier.of(
metalakeName(),
catalogName(),
tablePath.getDatabaseName(),
tablePath.getObjectName()),
TableChange.rename(newTableName));
} catch (NoSuchCatalogException e) {
if (!ignoreIfNotExists) {
throw new TableNotExistException(catalogName(), tablePath);
}
}
}

@Override
public void createTable(ObjectPath objectPath, CatalogBaseTable catalogBaseTable, boolean b)
public void createTable(ObjectPath tablePath, CatalogBaseTable table, boolean ignoreIfExists)
throws TableAlreadyExistException, DatabaseNotExistException, CatalogException {
throw new UnsupportedOperationException();
NameIdentifier identifier =
NameIdentifier.of(
metalakeName(), catalogName(), tablePath.getDatabaseName(), tablePath.getObjectName());

ResolvedCatalogBaseTable<?> resolvedTable = (ResolvedCatalogBaseTable<?>) table;
Column[] columns =
resolvedTable.getResolvedSchema().getColumns().stream()
.map(this::toGravitinoColumn)
.toArray(Column[]::new);
String comment = table.getComment();
Map<String, String> properties =
propertiesConverter.toGravitinoTableProperties(table.getOptions());
try {
catalog().asTableCatalog().createTable(identifier, columns, comment, properties);
} catch (NoSuchSchemaException e) {
throw new DatabaseNotExistException(catalogName(), tablePath.getDatabaseName(), e);
} catch (TableAlreadyExistsException e) {
if (!ignoreIfExists) {
throw new TableAlreadyExistException(catalogName(), tablePath, e);
}
} catch (Exception e) {
throw new CatalogException(e);
}
}

@Override
Expand Down Expand Up @@ -337,6 +435,29 @@ public void alterPartitionColumnStatistics(

protected abstract PropertiesConverter getPropertiesConverter();

protected CatalogBaseTable toFlinkTable(Table table) {

org.apache.flink.table.api.Schema.Builder builder =
org.apache.flink.table.api.Schema.newBuilder();
for (Column column : table.columns()) {
builder
.column(column.name(), TypeUtils.toFlinkType(column.dataType()))
.withComment(column.comment());
}
return CatalogTable.of(
builder.build(), table.comment(), ImmutableList.of(), table.properties());
}

private Column toGravitinoColumn(org.apache.flink.table.catalog.Column column) {
return Column.of(
column.getName(),
TypeUtils.toGravitinoType(column.getDataType().getLogicalType()),
column.getComment().orElse(null),
column.getDataType().getLogicalType().isNullable(),
false,
null);
}

@VisibleForTesting
static SchemaChange[] getSchemaChange(CatalogDatabase current, CatalogDatabase updated) {
Map<String, String> currentProperties = current.getProperties();
Expand Down Expand Up @@ -367,4 +488,8 @@ private Catalog catalog() {
private String catalogName() {
return getName();
}

private String metalakeName() {
return GravitinoCatalogManager.get().getMetalakeName();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,12 @@
package com.datastrato.gravitino.flink.connector.hive;

import com.datastrato.gravitino.catalog.hive.HiveCatalogPropertiesMeta;
import com.datastrato.gravitino.catalog.hive.HiveTablePropertiesMetadata;
import com.datastrato.gravitino.flink.connector.PropertiesConverter;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.table.catalog.CommonCatalogOptions;
import org.apache.hadoop.hive.conf.HiveConf;
Expand Down Expand Up @@ -62,4 +64,22 @@ public Map<String, String> toFlinkCatalogProperties(Map<String, String> gravitin
});
return flinkCatalogProperties;
}

@Override
public Map<String, String> toFlinkTableProperties(Map<String, String> gravitinoProperties) {
return gravitinoProperties.entrySet().stream()
.collect(
Collectors.toMap(
entry -> {
String key = entry.getKey();
if (key.startsWith(HiveTablePropertiesMetadata.SERDE_PARAMETER_PREFIX)) {
return key.substring(
HiveTablePropertiesMetadata.SERDE_PARAMETER_PREFIX.length());
} else {
return key;
}
},
Map.Entry::getValue,
(existingValue, newValue) -> newValue));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Copyright 2024 Datastrato Pvt Ltd.
* This software is licensed under the Apache License version 2.
*/

package com.datastrato.gravitino.flink.connector.utils;

import com.datastrato.gravitino.rel.types.Type;
import com.datastrato.gravitino.rel.types.Types;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.LogicalType;

public class TypeUtils {

private TypeUtils() {}

public static Type toGravitinoType(LogicalType logicalType) {
switch (logicalType.getTypeRoot()) {
case VARCHAR:
return Types.StringType.get();
case DOUBLE:
return Types.DoubleType.get();
default:
throw new UnsupportedOperationException(
"Not support type: " + logicalType.asSummaryString());
}
}

public static DataType toFlinkType(Type gravitinoType) {
if (gravitinoType instanceof Types.DoubleType) {
return DataTypes.DOUBLE();
} else if (gravitinoType instanceof Types.StringType) {
return DataTypes.STRING();
}
throw new UnsupportedOperationException("Not support " + gravitinoType.toString());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import com.datastrato.gravitino.integration.test.container.HiveContainer;
import com.datastrato.gravitino.integration.test.util.AbstractIT;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
import com.google.errorprone.annotations.FormatMethod;
import com.google.errorprone.annotations.FormatString;
import java.io.IOException;
Expand Down Expand Up @@ -141,6 +142,17 @@ protected TableResult sql(@FormatString String sql, Object... args) {
return tableEnv.executeSql(String.format(sql, args));
}

protected static void doWithSchema(Catalog catalog, String schemaName, Consumer<Catalog> action) {
Preconditions.checkNotNull(catalog);
Preconditions.checkNotNull(schemaName);
tableEnv.useCatalog(catalog.name());
if (!catalog.asSchemas().schemaExists(schemaName)) {
catalog.asSchemas().createSchema(schemaName, null, ImmutableMap.of());
}
tableEnv.useDatabase(schemaName);
action.accept(catalog);
}

protected static void doWithCatalog(Catalog catalog, Consumer<Catalog> action) {
Preconditions.checkNotNull(catalog);
tableEnv.useCatalog(catalog.name());
Expand Down
Loading

0 comments on commit 064dcc1

Please sign in to comment.