Skip to content

Commit

Permalink
[apache#3980]feat(flink-connector): Support partition for hive table …
Browse files Browse the repository at this point in the history
…operation

add UTs
  • Loading branch information
coolderli committed Jul 7, 2024
1 parent 34a81df commit a4b42d9
Show file tree
Hide file tree
Showing 5 changed files with 129 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,6 @@ default Map<String, String> toGravitinoSchemaProperties(Map<String, String> flin
}

/**
* Converts properties from Gravitino database properties to Flink connector schema properties.
*
* @param gravitinoProperties The schema properties provided by Gravitino.
* @return The database properties for the Flink connector.
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package com.datastrato.gravitino.flink.connector;

import com.datastrato.gravitino.rel.expressions.transforms.Transform;
import com.datastrato.gravitino.rel.expressions.transforms.Transforms;
import com.google.common.base.Preconditions;
import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;

/**
* The TransformConverter is used to convert the partition between Flink and Gravitino. The Flink
* only support identity transform. Some of the table like Apache Paimon will use the table
* properties to store the partition transform, so we can implement this interface to achieve more
* partition transform.
*/
public interface TransformConverter {
default List<String> toFlinkPartitionKeys(Transform[] transforms) {
List<String> partitionKeys =
Arrays.stream(transforms)
.filter(t -> t instanceof Transforms.IdentityTransform)
.map(Transform::name)
.collect(Collectors.toList());
Preconditions.checkArgument(
partitionKeys.size() == transforms.length,
"Flink only support identity transform for now.");
return partitionKeys;
}

default Transform[] toGravitinoTransforms(List<String> partitionsKey) {
return partitionsKey.stream().map(Transforms::identity).toArray(Transform[]::new);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,13 @@
import com.datastrato.gravitino.exceptions.SchemaAlreadyExistsException;
import com.datastrato.gravitino.exceptions.TableAlreadyExistsException;
import com.datastrato.gravitino.flink.connector.PropertiesConverter;
import com.datastrato.gravitino.flink.connector.TransformConverter;
import com.datastrato.gravitino.flink.connector.utils.TypeUtils;
import com.datastrato.gravitino.rel.Column;
import com.datastrato.gravitino.rel.Table;
import com.datastrato.gravitino.rel.TableChange;
import com.datastrato.gravitino.rel.expressions.transforms.Transform;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.MapDifference;
import com.google.common.collect.Maps;
import java.util.Arrays;
Expand Down Expand Up @@ -77,7 +78,7 @@
* The BaseCatalog that provides a default implementation for all methods in the {@link
* org.apache.flink.table.catalog.Catalog} interface.
*/
public abstract class BaseCatalog extends AbstractCatalog {
public abstract class BaseCatalog extends AbstractCatalog implements TransformConverter {
private final PropertiesConverter propertiesConverter;

protected BaseCatalog(String catalogName, String defaultDatabase) {
Expand Down Expand Up @@ -260,8 +261,9 @@ public void createTable(ObjectPath tablePath, CatalogBaseTable table, boolean ig
String comment = table.getComment();
Map<String, String> properties =
propertiesConverter.toGravitinoTableProperties(table.getOptions());
Transform[] partitions = toGravitinoTransforms(((CatalogTable) table).getPartitionKeys());
try {
catalog().asTableCatalog().createTable(identifier, columns, comment, properties);
catalog().asTableCatalog().createTable(identifier, columns, comment, properties, partitions);
} catch (NoSuchSchemaException e) {
throw new DatabaseNotExistException(catalogName(), tablePath.getDatabaseName(), e);
} catch (TableAlreadyExistsException e) {
Expand Down Expand Up @@ -448,8 +450,8 @@ protected CatalogBaseTable toFlinkTable(Table table) {
}
Map<String, String> flinkTableProperties =
propertiesConverter.toFlinkTableProperties(table.properties());
return CatalogTable.of(
builder.build(), table.comment(), ImmutableList.of(), flinkTableProperties);
List<String> partitionKeys = toFlinkPartitionKeys(table.partitioning());
return CatalogTable.of(builder.build(), table.comment(), partitionKeys, flinkTableProperties);
}

private Column toGravitinoColumn(org.apache.flink.table.catalog.Column column) {
Expand Down Expand Up @@ -492,4 +494,8 @@ private Catalog catalog() {
private String catalogName() {
return getName();
}

private String metalakeName() {
return GravitinoCatalogManager.get().getMetalakeName();
}
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
/*
<<<<<<< HEAD
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
Expand All @@ -15,6 +16,10 @@
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
=======
* Copyright 2024 Datastrato Pvt Ltd.
* This software is licensed under the Apache License version 2.
>>>>>>> 8e10c18c ([#3371] feat(flink-connector): support basic table operation)
*/

package com.datastrato.gravitino.flink.connector.utils;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,19 @@
package com.datastrato.gravitino.flink.connector.integration.test.hive;

import static com.datastrato.gravitino.catalog.hive.HiveCatalogPropertiesMeta.METASTORE_URIS;
import static com.datastrato.gravitino.flink.connector.integration.test.utils.TestUtils.assertColumns;

import com.datastrato.gravitino.NameIdentifier;
import com.datastrato.gravitino.flink.connector.PropertiesConverter;
import com.datastrato.gravitino.flink.connector.hive.GravitinoHiveCatalog;
import com.datastrato.gravitino.flink.connector.hive.GravitinoHiveCatalogFactoryOptions;
import com.datastrato.gravitino.flink.connector.integration.test.FlinkCommonIT;
import com.datastrato.gravitino.flink.connector.integration.test.utils.TestUtils;
import com.datastrato.gravitino.rel.Column;
import com.datastrato.gravitino.rel.Table;
import com.datastrato.gravitino.rel.expressions.transforms.Transform;
import com.datastrato.gravitino.rel.expressions.transforms.Transforms;
import com.datastrato.gravitino.rel.types.Types;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
Expand All @@ -32,12 +40,14 @@
import java.util.Optional;
import java.util.stream.Collectors;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.table.api.ResultKind;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.catalog.Catalog;
import org.apache.flink.table.catalog.CatalogDescriptor;
import org.apache.flink.table.catalog.CommonCatalogOptions;
import org.apache.flink.table.catalog.hive.factories.HiveCatalogFactoryOptions;
import org.apache.flink.types.Row;
import org.apache.hadoop.hive.conf.HiveConf;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Assertions;
Expand Down Expand Up @@ -281,6 +291,59 @@ public void testGetCatalogFromGravitino() {
numCatalogs, tableEnv.listCatalogs().length, "The created catalog should be dropped.");
}

@Test
public void testHivePartitionTable() {
String databaseName = "test_create_hive_partition_table_db";
String tableName = "test_create_hive_partition_table";
String comment = "test comment";
String key = "test key";
String value = "test value";

doWithSchema(
currentCatalog(),
databaseName,
catalog -> {
TableResult result =
sql(
"CREATE TABLE %s "
+ "(string_type STRING COMMENT 'string_type', "
+ " double_type DOUBLE COMMENT 'double_type')"
+ " PARTITIONED BY (string_type, double_type)"
+ " COMMENT '%s' WITH ("
+ "'%s' = '%s')",
tableName, comment, key, value);
TestUtils.assertTableResult(result, ResultKind.SUCCESS);

Table table =
catalog.asTableCatalog().loadTable(NameIdentifier.of(databaseName, tableName));
Assertions.assertNotNull(table);
Assertions.assertEquals(comment, table.comment());
Assertions.assertEquals(value, table.properties().get(key));
Column[] columns =
new Column[] {
Column.of("string_type", Types.StringType.get(), "string_type", true, false, null),
Column.of("double_type", Types.DoubleType.get(), "double_type")
};
assertColumns(columns, table.columns());
Transform[] partitions =
new Transform[] {
Transforms.identity("string_type"), Transforms.identity("double_type")
};
Assertions.assertArrayEquals(partitions, table.partitioning());

TestUtils.assertTableResult(
sql("INSERT INTO %s VALUES ('A', 1.0), ('B', 2.0)", tableName),
ResultKind.SUCCESS_WITH_CONTENT,
Row.of(-1L));
TestUtils.assertTableResult(
sql("SELECT * FROM %s", tableName),
ResultKind.SUCCESS_WITH_CONTENT,
Row.of("A", 1.0),
Row.of("B", 2.0));
},
true);
}

@Override
protected com.datastrato.gravitino.Catalog currentCatalog() {
return hiveCatalog;
Expand Down

0 comments on commit a4b42d9

Please sign in to comment.