Skip to content

Commit

Permalink
[#3371] feat(flink-connector): support basic table operation (#3795)
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?

- Support table operation on Flink

### Why are the changes needed?

- Fix: #3371

### Does this PR introduce _any_ user-facing change?

- no

### How was this patch tested?
- add Uts and ITs
  • Loading branch information
coolderli authored Jul 5, 2024
1 parent 4312b63 commit 6a3e0f3
Show file tree
Hide file tree
Showing 12 changed files with 563 additions and 29 deletions.
1 change: 1 addition & 0 deletions flink-connector/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,7 @@ dependencies {
exclude("com.google.code.findbugs", "jsr305")
}
testImplementation("org.apache.flink:flink-table-planner_$scalaVersion:$flinkVersion")
testImplementation("org.apache.flink:flink-test-utils:$flinkVersion")

testRuntimeOnly(libs.junit.jupiter.engine)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,4 +70,24 @@ default Map<String, String> toGravitinoSchemaProperties(Map<String, String> flin
default Map<String, String> toFlinkDatabaseProperties(Map<String, String> gravitinoProperties) {
return gravitinoProperties;
}

/**
* Converts properties from Gravitino table properties to Flink connector table properties.
*
* @param gravitinoProperties The table properties provided by Gravitino.
* @return The table properties for the Flink connector.
*/
default Map<String, String> toFlinkTableProperties(Map<String, String> gravitinoProperties) {
return gravitinoProperties;
}

/**
* Converts properties from Flink connector table properties to Gravitino table properties.
*
* @param flinkProperties The table properties provided by Flink.
* @return The table properties for the Gravitino.
*/
default Map<String, String> toGravitinoTableProperties(Map<String, String> flinkProperties) {
return flinkProperties;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,19 +20,30 @@
package com.datastrato.gravitino.flink.connector.catalog;

import com.datastrato.gravitino.Catalog;
import com.datastrato.gravitino.NameIdentifier;
import com.datastrato.gravitino.Namespace;
import com.datastrato.gravitino.Schema;
import com.datastrato.gravitino.SchemaChange;
import com.datastrato.gravitino.exceptions.NoSuchCatalogException;
import com.datastrato.gravitino.exceptions.NoSuchSchemaException;
import com.datastrato.gravitino.exceptions.NoSuchTableException;
import com.datastrato.gravitino.exceptions.NonEmptySchemaException;
import com.datastrato.gravitino.exceptions.SchemaAlreadyExistsException;
import com.datastrato.gravitino.exceptions.TableAlreadyExistsException;
import com.datastrato.gravitino.flink.connector.PropertiesConverter;
import com.datastrato.gravitino.flink.connector.utils.TypeUtils;
import com.datastrato.gravitino.rel.Column;
import com.datastrato.gravitino.rel.Table;
import com.datastrato.gravitino.rel.TableChange;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.MapDifference;
import com.google.common.collect.Maps;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.commons.compress.utils.Lists;
import org.apache.flink.table.catalog.AbstractCatalog;
import org.apache.flink.table.catalog.CatalogBaseTable;
Expand All @@ -41,7 +52,9 @@
import org.apache.flink.table.catalog.CatalogFunction;
import org.apache.flink.table.catalog.CatalogPartition;
import org.apache.flink.table.catalog.CatalogPartitionSpec;
import org.apache.flink.table.catalog.CatalogTable;
import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.table.catalog.ResolvedCatalogBaseTable;
import org.apache.flink.table.catalog.exceptions.CatalogException;
import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException;
import org.apache.flink.table.catalog.exceptions.DatabaseNotEmptyException;
Expand All @@ -58,6 +71,7 @@
import org.apache.flink.table.catalog.stats.CatalogColumnStatistics;
import org.apache.flink.table.catalog.stats.CatalogTableStatistics;
import org.apache.flink.table.expressions.Expression;
import org.apache.flink.table.types.DataType;

/**
* The BaseCatalog that provides a default implementation for all methods in the {@link
Expand Down Expand Up @@ -149,8 +163,17 @@ public void alterDatabase(
}

@Override
public List<String> listTables(String s) throws DatabaseNotExistException, CatalogException {
throw new UnsupportedOperationException();
public List<String> listTables(String databaseName)
throws DatabaseNotExistException, CatalogException {
try {
return Stream.of(catalog().asTableCatalog().listTables(Namespace.of(databaseName)))
.map(NameIdentifier::name)
.collect(Collectors.toList());
} catch (NoSuchSchemaException e) {
throw new DatabaseNotExistException(catalogName(), databaseName, e);
} catch (Exception e) {
throw new CatalogException(e);
}
}

@Override
Expand All @@ -159,32 +182,95 @@ public List<String> listViews(String s) throws DatabaseNotExistException, Catalo
}

@Override
public CatalogBaseTable getTable(ObjectPath objectPath)
public CatalogBaseTable getTable(ObjectPath tablePath)
throws TableNotExistException, CatalogException {
throw new UnsupportedOperationException();
try {
Table table =
catalog()
.asTableCatalog()
.loadTable(NameIdentifier.of(tablePath.getDatabaseName(), tablePath.getObjectName()));
return toFlinkTable(table);
} catch (NoSuchTableException e) {
throw new TableNotExistException(catalogName(), tablePath, e);
} catch (Exception e) {
throw new CatalogException(e);
}
}

@Override
public boolean tableExists(ObjectPath objectPath) throws CatalogException {
throw new UnsupportedOperationException();
public boolean tableExists(ObjectPath tablePath) throws CatalogException {
try {
return catalog()
.asTableCatalog()
.tableExists(NameIdentifier.of(tablePath.getDatabaseName(), tablePath.getObjectName()));
} catch (Exception e) {
throw new CatalogException(e);
}
}

@Override
public void dropTable(ObjectPath objectPath, boolean b)
public void dropTable(ObjectPath tablePath, boolean ignoreIfNotExists)
throws TableNotExistException, CatalogException {
throw new UnsupportedOperationException();
boolean dropped =
catalog()
.asTableCatalog()
.dropTable(NameIdentifier.of(tablePath.getDatabaseName(), tablePath.getObjectName()));
if (!dropped && !ignoreIfNotExists) {
throw new TableNotExistException(catalogName(), tablePath);
}
}

@Override
public void renameTable(ObjectPath objectPath, String s, boolean b)
public void renameTable(ObjectPath tablePath, String newTableName, boolean ignoreIfNotExists)
throws TableNotExistException, TableAlreadyExistException, CatalogException {
throw new UnsupportedOperationException();
NameIdentifier identifier =
NameIdentifier.of(Namespace.of(tablePath.getDatabaseName()), newTableName);

if (catalog().asTableCatalog().tableExists(identifier)) {
throw new TableAlreadyExistException(
catalogName(), ObjectPath.fromString(tablePath.getDatabaseName() + newTableName));
}

try {
catalog()
.asTableCatalog()
.alterTable(
NameIdentifier.of(tablePath.getDatabaseName(), tablePath.getObjectName()),
TableChange.rename(newTableName));
} catch (NoSuchTableException e) {
if (!ignoreIfNotExists) {
throw new TableNotExistException(catalogName(), tablePath, e);
}
} catch (Exception e) {
throw new CatalogException(e);
}
}

@Override
public void createTable(ObjectPath objectPath, CatalogBaseTable catalogBaseTable, boolean b)
public void createTable(ObjectPath tablePath, CatalogBaseTable table, boolean ignoreIfExists)
throws TableAlreadyExistException, DatabaseNotExistException, CatalogException {
throw new UnsupportedOperationException();
NameIdentifier identifier =
NameIdentifier.of(tablePath.getDatabaseName(), tablePath.getObjectName());

ResolvedCatalogBaseTable<?> resolvedTable = (ResolvedCatalogBaseTable<?>) table;
Column[] columns =
resolvedTable.getResolvedSchema().getColumns().stream()
.map(this::toGravitinoColumn)
.toArray(Column[]::new);
String comment = table.getComment();
Map<String, String> properties =
propertiesConverter.toGravitinoTableProperties(table.getOptions());
try {
catalog().asTableCatalog().createTable(identifier, columns, comment, properties);
} catch (NoSuchSchemaException e) {
throw new DatabaseNotExistException(catalogName(), tablePath.getDatabaseName(), e);
} catch (TableAlreadyExistsException e) {
if (!ignoreIfExists) {
throw new TableAlreadyExistException(catalogName(), tablePath, e);
}
} catch (Exception e) {
throw new CatalogException(e);
}
}

@Override
Expand Down Expand Up @@ -351,6 +437,31 @@ public void alterPartitionColumnStatistics(

protected abstract PropertiesConverter getPropertiesConverter();

protected CatalogBaseTable toFlinkTable(Table table) {
org.apache.flink.table.api.Schema.Builder builder =
org.apache.flink.table.api.Schema.newBuilder();
for (Column column : table.columns()) {
DataType flinkType = TypeUtils.toFlinkType(column.dataType());
builder
.column(column.name(), column.nullable() ? flinkType.nullable() : flinkType.notNull())
.withComment(column.comment());
}
Map<String, String> flinkTableProperties =
propertiesConverter.toFlinkTableProperties(table.properties());
return CatalogTable.of(
builder.build(), table.comment(), ImmutableList.of(), flinkTableProperties);
}

private Column toGravitinoColumn(org.apache.flink.table.catalog.Column column) {
return Column.of(
column.getName(),
TypeUtils.toGravitinoType(column.getDataType().getLogicalType()),
column.getComment().orElse(null),
column.getDataType().getLogicalType().isNullable(),
false,
null);
}

@VisibleForTesting
static SchemaChange[] getSchemaChange(CatalogDatabase current, CatalogDatabase updated) {
Map<String, String> currentProperties = current.getProperties();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,12 @@
import com.datastrato.gravitino.flink.connector.catalog.BaseCatalog;
import java.util.Optional;
import javax.annotation.Nullable;
import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.table.catalog.exceptions.CatalogException;
import org.apache.flink.table.catalog.exceptions.TableNotExistException;
import org.apache.flink.table.catalog.hive.HiveCatalog;
import org.apache.flink.table.catalog.stats.CatalogColumnStatistics;
import org.apache.flink.table.catalog.stats.CatalogTableStatistics;
import org.apache.flink.table.factories.Factory;
import org.apache.hadoop.hive.conf.HiveConf;

Expand All @@ -43,6 +48,18 @@ public class GravitinoHiveCatalog extends BaseCatalog {
this.hiveCatalog = new HiveCatalog(catalogName, defaultDatabase, hiveConf, hiveVersion);
}

@Override
public void open() throws CatalogException {
super.open();
hiveCatalog.open();
}

@Override
public void close() throws CatalogException {
super.close();
hiveCatalog.close();
}

public HiveConf getHiveConf() {
return hiveCatalog.getHiveConf();
}
Expand All @@ -56,4 +73,16 @@ public Optional<Factory> getFactory() {
protected PropertiesConverter getPropertiesConverter() {
return HivePropertiesConverter.INSTANCE;
}

@Override
public CatalogTableStatistics getTableStatistics(ObjectPath objectPath)
throws TableNotExistException, CatalogException {
return hiveCatalog.getTableStatistics(objectPath);
}

@Override
public CatalogColumnStatistics getTableColumnStatistics(ObjectPath tablePath)
throws TableNotExistException, CatalogException {
return hiveCatalog.getTableColumnStatistics(tablePath);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,12 @@
package com.datastrato.gravitino.flink.connector.hive;

import com.datastrato.gravitino.catalog.hive.HiveCatalogPropertiesMeta;
import com.datastrato.gravitino.catalog.hive.HiveTablePropertiesMetadata;
import com.datastrato.gravitino.flink.connector.PropertiesConverter;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.table.catalog.CommonCatalogOptions;
import org.apache.hadoop.hive.conf.HiveConf;
Expand Down Expand Up @@ -76,4 +78,25 @@ public Map<String, String> toFlinkCatalogProperties(Map<String, String> gravitin
});
return flinkCatalogProperties;
}

@Override
public Map<String, String> toFlinkTableProperties(Map<String, String> gravitinoProperties) {
Map<String, String> properties =
gravitinoProperties.entrySet().stream()
.collect(
Collectors.toMap(
entry -> {
String key = entry.getKey();
if (key.startsWith(HiveTablePropertiesMetadata.SERDE_PARAMETER_PREFIX)) {
return key.substring(
HiveTablePropertiesMetadata.SERDE_PARAMETER_PREFIX.length());
} else {
return key;
}
},
Map.Entry::getValue,
(existingValue, newValue) -> newValue));
properties.put("connector", "hive");
return properties;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package com.datastrato.gravitino.flink.connector.utils;

import com.datastrato.gravitino.rel.types.Type;
import com.datastrato.gravitino.rel.types.Types;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.LogicalType;

public class TypeUtils {

private TypeUtils() {}

public static Type toGravitinoType(LogicalType logicalType) {
switch (logicalType.getTypeRoot()) {
case VARCHAR:
return Types.StringType.get();
case DOUBLE:
return Types.DoubleType.get();
case INTEGER:
return Types.IntegerType.get();
default:
throw new UnsupportedOperationException(
"Not support type: " + logicalType.asSummaryString());
}
}

public static DataType toFlinkType(Type gravitinoType) {
switch (gravitinoType.name()) {
case DOUBLE:
return DataTypes.DOUBLE();
case STRING:
return DataTypes.STRING();
case INTEGER:
return DataTypes.INT();
default:
throw new UnsupportedOperationException("Not support " + gravitinoType.toString());
}
}
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
Expand Down
Loading

0 comments on commit 6a3e0f3

Please sign in to comment.