diff --git a/flink-connector/src/main/java/com/datastrato/gravitino/flink/connector/PropertiesConverter.java b/flink-connector/src/main/java/com/datastrato/gravitino/flink/connector/PropertiesConverter.java index e208d5ccdb7..01e984c09d4 100644 --- a/flink-connector/src/main/java/com/datastrato/gravitino/flink/connector/PropertiesConverter.java +++ b/flink-connector/src/main/java/com/datastrato/gravitino/flink/connector/PropertiesConverter.java @@ -48,7 +48,17 @@ default Map toGravitinoSchemaProperties(Map flin } /** - * Converts properties from Gravitino database properties to Flink connector schema properties. + * Converts properties from Flink connector table properties to Gravitino table properties. + * + * @param flinkProperties The table properties provided by Flink. + * @return The table properties for the Gravitino. + */ + default Map toGravitinoTableProperties(Map flinkProperties) { + return flinkProperties; + } + + /** + * Converts properties from Gravitino schema properties to Flink connector database properties. * * @param gravitinoProperties The schema properties provided by Gravitino. * @return The database properties for the Flink connector. @@ -56,4 +66,14 @@ default Map toGravitinoSchemaProperties(Map flin default Map toFlinkDatabaseProperties(Map gravitinoProperties) { return gravitinoProperties; } + + /** + * Converts properties from Gravitino table properties to Flink connector table properties. + * + * @param gravitinoProperties The table properties provided by Gravitino. + * @return The table properties for the Flink connector. + */ + default Map toFlinkTableProperties(Map gravitinoProperties) { + return gravitinoProperties; + } } diff --git a/flink-connector/src/main/java/com/datastrato/gravitino/flink/connector/catalog/BaseCatalog.java b/flink-connector/src/main/java/com/datastrato/gravitino/flink/connector/catalog/BaseCatalog.java index f5283072932..a32bfa43ca0 100644 --- a/flink-connector/src/main/java/com/datastrato/gravitino/flink/connector/catalog/BaseCatalog.java +++ b/flink-connector/src/main/java/com/datastrato/gravitino/flink/connector/catalog/BaseCatalog.java @@ -6,19 +6,29 @@ package com.datastrato.gravitino.flink.connector.catalog; import com.datastrato.gravitino.Catalog; +import com.datastrato.gravitino.NameIdentifier; +import com.datastrato.gravitino.Namespace; import com.datastrato.gravitino.Schema; import com.datastrato.gravitino.SchemaChange; import com.datastrato.gravitino.exceptions.NoSuchCatalogException; import com.datastrato.gravitino.exceptions.NoSuchSchemaException; import com.datastrato.gravitino.exceptions.NonEmptySchemaException; import com.datastrato.gravitino.exceptions.SchemaAlreadyExistsException; +import com.datastrato.gravitino.exceptions.TableAlreadyExistsException; import com.datastrato.gravitino.flink.connector.PropertiesConverter; +import com.datastrato.gravitino.flink.connector.utils.TypeUtils; +import com.datastrato.gravitino.rel.Column; +import com.datastrato.gravitino.rel.Table; +import com.datastrato.gravitino.rel.TableChange; import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.ImmutableList; import com.google.common.collect.MapDifference; import com.google.common.collect.Maps; import java.util.Arrays; import java.util.List; import java.util.Map; +import java.util.stream.Collectors; +import java.util.stream.Stream; import org.apache.commons.compress.utils.Lists; import org.apache.flink.table.catalog.AbstractCatalog; import org.apache.flink.table.catalog.CatalogBaseTable; @@ -27,7 +37,9 @@ import org.apache.flink.table.catalog.CatalogFunction; import org.apache.flink.table.catalog.CatalogPartition; import org.apache.flink.table.catalog.CatalogPartitionSpec; +import org.apache.flink.table.catalog.CatalogTable; import org.apache.flink.table.catalog.ObjectPath; +import org.apache.flink.table.catalog.ResolvedCatalogBaseTable; import org.apache.flink.table.catalog.exceptions.CatalogException; import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException; import org.apache.flink.table.catalog.exceptions.DatabaseNotEmptyException; @@ -135,8 +147,18 @@ public void alterDatabase( } @Override - public List listTables(String s) throws DatabaseNotExistException, CatalogException { - throw new UnsupportedOperationException(); + public List listTables(String databaseName) + throws DatabaseNotExistException, CatalogException { + try { + return Stream.of( + catalog() + .asTableCatalog() + .listTables(Namespace.of(metalakeName(), catalogName(), databaseName))) + .map(NameIdentifier::name) + .collect(Collectors.toList()); + } catch (NoSuchSchemaException e) { + throw new DatabaseNotExistException(catalogName(), databaseName); + } } @Override @@ -145,32 +167,108 @@ public List listViews(String s) throws DatabaseNotExistException, Catalo } @Override - public CatalogBaseTable getTable(ObjectPath objectPath) + public CatalogBaseTable getTable(ObjectPath tablePath) throws TableNotExistException, CatalogException { - throw new UnsupportedOperationException(); + try { + Table table = + catalog() + .asTableCatalog() + .loadTable( + NameIdentifier.of( + metalakeName(), + catalogName(), + tablePath.getDatabaseName(), + tablePath.getObjectName())); + return toFlinkTable(table); + } catch (NoSuchCatalogException e) { + throw new CatalogException(e); + } } @Override - public boolean tableExists(ObjectPath objectPath) throws CatalogException { - throw new UnsupportedOperationException(); + public boolean tableExists(ObjectPath tablePath) throws CatalogException { + return catalog() + .asTableCatalog() + .tableExists( + NameIdentifier.of( + metalakeName(), + catalogName(), + tablePath.getDatabaseName(), + tablePath.getObjectName())); } @Override - public void dropTable(ObjectPath objectPath, boolean b) + public void dropTable(ObjectPath tablePath, boolean ignoreIfNotExists) throws TableNotExistException, CatalogException { - throw new UnsupportedOperationException(); + boolean dropped = + catalog() + .asTableCatalog() + .dropTable( + NameIdentifier.of( + metalakeName(), + catalogName(), + tablePath.getDatabaseName(), + tablePath.getObjectName())); + if (!dropped && !ignoreIfNotExists) { + throw new TableNotExistException(catalogName(), tablePath); + } } @Override - public void renameTable(ObjectPath objectPath, String s, boolean b) + public void renameTable(ObjectPath tablePath, String newTableName, boolean ignoreIfNotExists) throws TableNotExistException, TableAlreadyExistException, CatalogException { - throw new UnsupportedOperationException(); + NameIdentifier identifier = + NameIdentifier.of( + Namespace.of(metalakeName(), catalogName(), tablePath.getDatabaseName()), newTableName); + + if (catalog().asTableCatalog().tableExists(identifier)) { + throw new TableAlreadyExistException( + catalogName(), ObjectPath.fromString(tablePath.getDatabaseName() + newTableName)); + } + + try { + catalog() + .asTableCatalog() + .alterTable( + NameIdentifier.of( + metalakeName(), + catalogName(), + tablePath.getDatabaseName(), + tablePath.getObjectName()), + TableChange.rename(newTableName)); + } catch (NoSuchCatalogException e) { + if (!ignoreIfNotExists) { + throw new TableNotExistException(catalogName(), tablePath); + } + } } @Override - public void createTable(ObjectPath objectPath, CatalogBaseTable catalogBaseTable, boolean b) + public void createTable(ObjectPath tablePath, CatalogBaseTable table, boolean ignoreIfExists) throws TableAlreadyExistException, DatabaseNotExistException, CatalogException { - throw new UnsupportedOperationException(); + NameIdentifier identifier = + NameIdentifier.of( + metalakeName(), catalogName(), tablePath.getDatabaseName(), tablePath.getObjectName()); + + ResolvedCatalogBaseTable resolvedTable = (ResolvedCatalogBaseTable) table; + Column[] columns = + resolvedTable.getResolvedSchema().getColumns().stream() + .map(this::toGravitinoColumn) + .toArray(Column[]::new); + String comment = table.getComment(); + Map properties = + propertiesConverter.toGravitinoTableProperties(table.getOptions()); + try { + catalog().asTableCatalog().createTable(identifier, columns, comment, properties); + } catch (NoSuchSchemaException e) { + throw new DatabaseNotExistException(catalogName(), tablePath.getDatabaseName(), e); + } catch (TableAlreadyExistsException e) { + if (!ignoreIfExists) { + throw new TableAlreadyExistException(catalogName(), tablePath, e); + } + } catch (Exception e) { + throw new CatalogException(e); + } } @Override @@ -337,6 +435,29 @@ public void alterPartitionColumnStatistics( protected abstract PropertiesConverter getPropertiesConverter(); + protected CatalogBaseTable toFlinkTable(Table table) { + + org.apache.flink.table.api.Schema.Builder builder = + org.apache.flink.table.api.Schema.newBuilder(); + for (Column column : table.columns()) { + builder + .column(column.name(), TypeUtils.toFlinkType(column.dataType())) + .withComment(column.comment()); + } + return CatalogTable.of( + builder.build(), table.comment(), ImmutableList.of(), table.properties()); + } + + private Column toGravitinoColumn(org.apache.flink.table.catalog.Column column) { + return Column.of( + column.getName(), + TypeUtils.toGravitinoType(column.getDataType().getLogicalType()), + column.getComment().orElse(null), + column.getDataType().getLogicalType().isNullable(), + false, + null); + } + @VisibleForTesting static SchemaChange[] getSchemaChange(CatalogDatabase current, CatalogDatabase updated) { Map currentProperties = current.getProperties(); @@ -367,4 +488,8 @@ private Catalog catalog() { private String catalogName() { return getName(); } + + private String metalakeName() { + return GravitinoCatalogManager.get().getMetalakeName(); + } } diff --git a/flink-connector/src/main/java/com/datastrato/gravitino/flink/connector/hive/HivePropertiesConverter.java b/flink-connector/src/main/java/com/datastrato/gravitino/flink/connector/hive/HivePropertiesConverter.java index d8e7c5b2a3e..8d557e4305f 100644 --- a/flink-connector/src/main/java/com/datastrato/gravitino/flink/connector/hive/HivePropertiesConverter.java +++ b/flink-connector/src/main/java/com/datastrato/gravitino/flink/connector/hive/HivePropertiesConverter.java @@ -6,10 +6,12 @@ package com.datastrato.gravitino.flink.connector.hive; import com.datastrato.gravitino.catalog.hive.HiveCatalogPropertiesMeta; +import com.datastrato.gravitino.catalog.hive.HiveTablePropertiesMetadata; import com.datastrato.gravitino.flink.connector.PropertiesConverter; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Maps; import java.util.Map; +import java.util.stream.Collectors; import org.apache.flink.configuration.Configuration; import org.apache.flink.table.catalog.CommonCatalogOptions; import org.apache.hadoop.hive.conf.HiveConf; @@ -62,4 +64,22 @@ public Map toFlinkCatalogProperties(Map gravitin }); return flinkCatalogProperties; } + + @Override + public Map toFlinkTableProperties(Map gravitinoProperties) { + return gravitinoProperties.entrySet().stream() + .collect( + Collectors.toMap( + entry -> { + String key = entry.getKey(); + if (key.startsWith(HiveTablePropertiesMetadata.SERDE_PARAMETER_PREFIX)) { + return key.substring( + HiveTablePropertiesMetadata.SERDE_PARAMETER_PREFIX.length()); + } else { + return key; + } + }, + Map.Entry::getValue, + (existingValue, newValue) -> newValue)); + } } diff --git a/flink-connector/src/main/java/com/datastrato/gravitino/flink/connector/utils/TypeUtils.java b/flink-connector/src/main/java/com/datastrato/gravitino/flink/connector/utils/TypeUtils.java new file mode 100644 index 00000000000..a112ee5a043 --- /dev/null +++ b/flink-connector/src/main/java/com/datastrato/gravitino/flink/connector/utils/TypeUtils.java @@ -0,0 +1,38 @@ +/* + * Copyright 2024 Datastrato Pvt Ltd. + * This software is licensed under the Apache License version 2. + */ + +package com.datastrato.gravitino.flink.connector.utils; + +import com.datastrato.gravitino.rel.types.Type; +import com.datastrato.gravitino.rel.types.Types; +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.types.DataType; +import org.apache.flink.table.types.logical.LogicalType; + +public class TypeUtils { + + private TypeUtils() {} + + public static Type toGravitinoType(LogicalType logicalType) { + switch (logicalType.getTypeRoot()) { + case VARCHAR: + return Types.StringType.get(); + case DOUBLE: + return Types.DoubleType.get(); + default: + throw new UnsupportedOperationException( + "Not support type: " + logicalType.asSummaryString()); + } + } + + public static DataType toFlinkType(Type gravitinoType) { + if (gravitinoType instanceof Types.DoubleType) { + return DataTypes.DOUBLE(); + } else if (gravitinoType instanceof Types.StringType) { + return DataTypes.STRING(); + } + throw new UnsupportedOperationException("Not support " + gravitinoType.toString()); + } +} diff --git a/flink-connector/src/test/java/com/datastrato/gravitino/flink/connector/integration/test/FlinkEnvIT.java b/flink-connector/src/test/java/com/datastrato/gravitino/flink/connector/integration/test/FlinkEnvIT.java index 9b30619a352..5b794714c97 100644 --- a/flink-connector/src/test/java/com/datastrato/gravitino/flink/connector/integration/test/FlinkEnvIT.java +++ b/flink-connector/src/test/java/com/datastrato/gravitino/flink/connector/integration/test/FlinkEnvIT.java @@ -12,6 +12,7 @@ import com.datastrato.gravitino.integration.test.container.HiveContainer; import com.datastrato.gravitino.integration.test.util.AbstractIT; import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableMap; import com.google.errorprone.annotations.FormatMethod; import com.google.errorprone.annotations.FormatString; import java.io.IOException; @@ -141,6 +142,17 @@ protected TableResult sql(@FormatString String sql, Object... args) { return tableEnv.executeSql(String.format(sql, args)); } + protected static void doWithSchema(Catalog catalog, String schemaName, Consumer action) { + Preconditions.checkNotNull(catalog); + Preconditions.checkNotNull(schemaName); + tableEnv.useCatalog(catalog.name()); + if (!catalog.asSchemas().schemaExists(schemaName)) { + catalog.asSchemas().createSchema(schemaName, null, ImmutableMap.of()); + } + tableEnv.useDatabase(schemaName); + action.accept(catalog); + } + protected static void doWithCatalog(Catalog catalog, Consumer action) { Preconditions.checkNotNull(catalog); tableEnv.useCatalog(catalog.name()); diff --git a/flink-connector/src/test/java/com/datastrato/gravitino/flink/connector/integration/test/hive/FlinkHiveCatalogIT.java b/flink-connector/src/test/java/com/datastrato/gravitino/flink/connector/integration/test/hive/FlinkHiveCatalogIT.java index 00744087a5f..8fc120beb0c 100644 --- a/flink-connector/src/test/java/com/datastrato/gravitino/flink/connector/integration/test/hive/FlinkHiveCatalogIT.java +++ b/flink-connector/src/test/java/com/datastrato/gravitino/flink/connector/integration/test/hive/FlinkHiveCatalogIT.java @@ -5,24 +5,40 @@ package com.datastrato.gravitino.flink.connector.integration.test.hive; import static com.datastrato.gravitino.catalog.hive.HiveCatalogPropertiesMeta.METASTORE_URIS; +import static com.datastrato.gravitino.flink.connector.integration.test.utils.TestUtils.assertColumns; +import static com.datastrato.gravitino.flink.connector.integration.test.utils.TestUtils.toFlinkPhysicalColumn; +import static com.datastrato.gravitino.rel.expressions.transforms.Transforms.EMPTY_TRANSFORM; +import com.datastrato.gravitino.NameIdentifier; import com.datastrato.gravitino.flink.connector.PropertiesConverter; import com.datastrato.gravitino.flink.connector.hive.GravitinoHiveCatalog; import com.datastrato.gravitino.flink.connector.hive.GravitinoHiveCatalogFactoryOptions; import com.datastrato.gravitino.flink.connector.integration.test.FlinkCommonIT; +import com.datastrato.gravitino.flink.connector.integration.test.utils.TestUtils; +import com.datastrato.gravitino.rel.Column; +import com.datastrato.gravitino.rel.Table; +import com.datastrato.gravitino.rel.types.Types; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; import java.util.Arrays; import java.util.Map; import java.util.Optional; import java.util.stream.Collectors; +import jline.internal.Preconditions; import org.apache.flink.configuration.Configuration; +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.api.ResultKind; import org.apache.flink.table.api.TableResult; import org.apache.flink.table.api.ValidationException; import org.apache.flink.table.catalog.Catalog; +import org.apache.flink.table.catalog.CatalogBaseTable; import org.apache.flink.table.catalog.CatalogDescriptor; +import org.apache.flink.table.catalog.CatalogTable; import org.apache.flink.table.catalog.CommonCatalogOptions; +import org.apache.flink.table.catalog.ObjectPath; +import org.apache.flink.table.catalog.exceptions.TableNotExistException; import org.apache.flink.table.catalog.hive.factories.HiveCatalogFactoryOptions; +import org.apache.flink.types.Row; import org.apache.hadoop.hive.conf.HiveConf; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.Assertions; @@ -32,23 +48,40 @@ @Tag("gravitino-docker-test") public class FlinkHiveCatalogIT extends FlinkCommonIT { + private static final String DEFAULT_HIVE_CATALOG = "test_flink_hive_schema_catalog"; + private static final String DEFAULT_HIVE_SCHEMA = "test_flink_hive_schema"; private static com.datastrato.gravitino.Catalog hiveCatalog; @BeforeAll static void hiveStartUp() { + initDefaultHiveCatalog(); + initDefaultHiveSchema(); + } + + @AfterAll + static void hiveStop() { + hiveCatalog.asSchemas().dropSchema(DEFAULT_HIVE_SCHEMA, true); + metalake.dropCatalog(DEFAULT_HIVE_CATALOG); + } + + protected static void initDefaultHiveCatalog() { + Preconditions.checkNotNull(metalake); hiveCatalog = metalake.createCatalog( - "test_flink_hive_schema_catalog", + DEFAULT_HIVE_CATALOG, com.datastrato.gravitino.Catalog.Type.RELATIONAL, "hive", null, ImmutableMap.of("metastore.uris", hiveMetastoreUri)); } - @AfterAll - static void hiveStop() { - metalake.dropCatalog("test_flink_hive_schema_catalog"); + protected static void initDefaultHiveSchema() { + Preconditions.checkNotNull(metalake); + Preconditions.checkNotNull(hiveCatalog); + if (!hiveCatalog.asSchemas().schemaExists(DEFAULT_HIVE_SCHEMA)) { + hiveCatalog.asSchemas().createSchema(DEFAULT_HIVE_SCHEMA, null, ImmutableMap.of()); + } } @Test @@ -258,6 +291,165 @@ public void testGetCatalogFromGravitino() { numCatalogs, tableEnv.listCatalogs().length, "The created catalog should be dropped."); } + @Test + public void testCreateNoPartitionTable() { + String tableName, comment; + tableName = "test_create_no_partition_table"; + comment = "test comment"; + String key = "test key"; + String value = "test value"; + + // 1. The NOT NULL constraint for column is only supported since Hive 3.0, + // but the current Gravitino Hive catalog only supports Hive 2.x. + // 2. Hive doesn't support Time and Timestamp with timezone type. + // 3. Flink SQL only support to create Interval Month and Second(3). + doWithSchema( + hiveCatalog, + DEFAULT_HIVE_SCHEMA, + catalog -> { + TableResult result = + sql( + "CREATE TABLE %s " + + "(string_type STRING COMMENT 'string_type', " + + " double_type DOUBLE COMMENT 'double_type'" + + " COMMENT '%s' WITH (" + + "'%s' = '%s')", + tableName, comment, key, value); + TestUtils.assertTableResult(result, ResultKind.SUCCESS); + + Table table = + catalog + .asTableCatalog() + .loadTable( + NameIdentifier.of( + metalake.name(), DEFAULT_HIVE_CATALOG, DEFAULT_HIVE_SCHEMA, tableName)); + Assertions.assertNotNull(table); + Assertions.assertEquals(comment, table.comment()); + Assertions.assertEquals(value, table.properties().get(key)); + Column[] columns = + new Column[] { + Column.of("string_type", Types.StringType.get(), "string_type", true, false, null), + Column.of("double_type", Types.DoubleType.get(), "double_type") + }; + assertColumns(columns, table.columns()); + Assertions.assertArrayEquals(EMPTY_TRANSFORM, table.partitioning()); + }); + } + + @Test + public void testListTables() { + com.datastrato.gravitino.Catalog currentCatalog = metalake.loadCatalog(DEFAULT_HIVE_CATALOG); + String newSchema = "test_list_table_catalog"; + try { + Column[] columns = new Column[] {Column.of("user_id", Types.IntegerType.get(), "USER_ID")}; + doWithSchema( + currentCatalog, + newSchema, + catalog -> { + catalog + .asTableCatalog() + .createTable( + NameIdentifier.of( + metalake.name(), DEFAULT_HIVE_CATALOG, newSchema, "test_table1"), + columns, + "comment1", + ImmutableMap.of()); + catalog + .asTableCatalog() + .createTable( + NameIdentifier.of( + metalake.name(), DEFAULT_HIVE_CATALOG, newSchema, "test_table2"), + columns, + "comment2", + ImmutableMap.of()); + TableResult result = sql("SHOW TABLES"); + TestUtils.assertTableResult( + result, + ResultKind.SUCCESS_WITH_CONTENT, + Row.of("test_table1"), + Row.of("test_table2")); + }); + } finally { + currentCatalog.asSchemas().dropSchema(newSchema, true); + } + } + + @Test + public void testDropTable() { + doWithSchema( + metalake.loadCatalog(DEFAULT_HIVE_CATALOG), + DEFAULT_HIVE_SCHEMA, + catalog -> { + String tableName = "test_drop_table"; + Column[] columns = + new Column[] {Column.of("user_id", Types.IntegerType.get(), "USER_ID")}; + NameIdentifier identifier = + NameIdentifier.of( + metalake.name(), DEFAULT_HIVE_CATALOG, DEFAULT_HIVE_SCHEMA, tableName); + catalog.asTableCatalog().createTable(identifier, columns, "comment1", ImmutableMap.of()); + Assertions.assertTrue(catalog.asTableCatalog().tableExists(identifier)); + + TableResult result = sql("DROP TABLE %s", tableName); + TestUtils.assertTableResult(result, ResultKind.SUCCESS); + Assertions.assertFalse(catalog.asTableCatalog().tableExists(identifier)); + }); + } + + @Test + public void testGetTable() { + Column[] columns = + new Column[] { + Column.of("string_type", Types.StringType.get(), "string_type", true, false, null), + Column.of("double_type", Types.DoubleType.get(), "double_type") + }; + + doWithSchema( + metalake.loadCatalog(DEFAULT_HIVE_CATALOG), + DEFAULT_HIVE_SCHEMA, + catalog -> { + String tableName = "test_desc_table"; + String comment = "comment1"; + catalog + .asTableCatalog() + .createTable( + NameIdentifier.of( + metalake.name(), + DEFAULT_HIVE_CATALOG, + DEFAULT_HIVE_SCHEMA, + "test_desc_table"), + columns, + comment, + ImmutableMap.of("k1", "v1")); + + Optional flinkCatalog = + tableEnv.getCatalog(DEFAULT_HIVE_CATALOG); + Assertions.assertTrue(flinkCatalog.isPresent()); + try { + CatalogBaseTable table = + flinkCatalog.get().getTable(new ObjectPath(DEFAULT_HIVE_SCHEMA, tableName)); + Assertions.assertNotNull(table); + Assertions.assertEquals(CatalogBaseTable.TableKind.TABLE, table.getTableKind()); + Assertions.assertEquals(comment, table.getComment()); + + org.apache.flink.table.catalog.Column[] expected = + new org.apache.flink.table.catalog.Column[] { + org.apache.flink.table.catalog.Column.physical("string_type", DataTypes.STRING()) + .withComment("string_type"), + org.apache.flink.table.catalog.Column.physical("double_type", DataTypes.DOUBLE()) + .withComment("double_type") + }; + org.apache.flink.table.catalog.Column[] actual = + toFlinkPhysicalColumn(table.getUnresolvedSchema().getColumns()); + Assertions.assertArrayEquals(expected, actual); + + CatalogTable catalogTable = (CatalogTable) table; + Assertions.assertFalse(catalogTable.isPartitioned()); + } catch (TableNotExistException e) { + Assertions.fail(e); + } + }); + } + @Override protected com.datastrato.gravitino.Catalog currentCatalog() { return hiveCatalog; diff --git a/flink-connector/src/test/java/com/datastrato/gravitino/flink/connector/integration/test/utils/TestUtils.java b/flink-connector/src/test/java/com/datastrato/gravitino/flink/connector/integration/test/utils/TestUtils.java index 50c025e9d3f..c92852ff4cd 100644 --- a/flink-connector/src/test/java/com/datastrato/gravitino/flink/connector/integration/test/utils/TestUtils.java +++ b/flink-connector/src/test/java/com/datastrato/gravitino/flink/connector/integration/test/utils/TestUtils.java @@ -4,10 +4,13 @@ */ package com.datastrato.gravitino.flink.connector.integration.test.utils; +import com.datastrato.gravitino.rel.Column; import com.google.common.collect.Lists; import java.util.List; import org.apache.flink.table.api.ResultKind; +import org.apache.flink.table.api.Schema; import org.apache.flink.table.api.TableResult; +import org.apache.flink.table.types.DataType; import org.apache.flink.types.Row; import org.junit.jupiter.api.Assertions; @@ -29,4 +32,32 @@ public static void assertTableResult( } } } + + public static void assertColumns(Column[] expected, Column[] actual) { + Assertions.assertEquals(expected.length, actual.length); + for (int i = 0; i < expected.length; i++) { + Assertions.assertEquals(expected[i].name(), actual[i].name()); + Assertions.assertEquals(expected[i].comment(), actual[i].comment()); + Assertions.assertEquals( + expected[i].dataType().simpleString(), actual[i].dataType().simpleString()); + Assertions.assertEquals(expected[i].defaultValue(), actual[i].defaultValue()); + Assertions.assertEquals(expected[i].autoIncrement(), actual[i].autoIncrement()); + Assertions.assertEquals(expected[i].nullable(), actual[i].nullable()); + } + } + + public static org.apache.flink.table.catalog.Column[] toFlinkPhysicalColumn( + List unresolvedPhysicalColumns) { + return unresolvedPhysicalColumns.stream() + .map( + column -> { + Schema.UnresolvedPhysicalColumn unresolvedPhysicalColumn = + (Schema.UnresolvedPhysicalColumn) column; + return org.apache.flink.table.catalog.Column.physical( + unresolvedPhysicalColumn.getName(), + (DataType) unresolvedPhysicalColumn.getDataType()) + .withComment(unresolvedPhysicalColumn.getComment().orElse(null)); + }) + .toArray(org.apache.flink.table.catalog.Column[]::new); + } }