From e357efe88ca6a4079206dc4143768192fe64a4d0 Mon Sep 17 00:00:00 2001 From: yangyang zhong <35210666+hdygxsj@users.noreply.github.com> Date: Thu, 19 Dec 2024 21:24:46 +0800 Subject: [PATCH] rebase --- flink-connector/flink/build.gradle.kts | 13 +- .../paimon/GravitinoPaimonCatalog.java | 47 +++++ .../paimon/GravitinoPaimonCatalogFactory.java | 81 ++++++++ .../GravitinoPaimonCatalogFactoryOptions.java | 46 +++++ .../paimon/PaimonPropertiesConverter.java | 90 +++++++++ .../org.apache.flink.table.factories.Factory | 3 +- .../integration/test/FlinkCommonIT.java | 28 +++ .../test/paimon/FlinkPaimonCatalogIT.java | 189 ++++++++++++++++++ .../paimon/TestPaimonPropertiesConverter.java | 108 ++++++++++ 9 files changed, 603 insertions(+), 2 deletions(-) create mode 100644 flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/paimon/GravitinoPaimonCatalog.java create mode 100644 flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/paimon/GravitinoPaimonCatalogFactory.java create mode 100644 flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/paimon/GravitinoPaimonCatalogFactoryOptions.java create mode 100644 flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/paimon/PaimonPropertiesConverter.java create mode 100644 flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/paimon/FlinkPaimonCatalogIT.java create mode 100644 flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/paimon/TestPaimonPropertiesConverter.java diff --git a/flink-connector/flink/build.gradle.kts b/flink-connector/flink/build.gradle.kts index 9e2a48c036c..fafaa760701 100644 --- a/flink-connector/flink/build.gradle.kts +++ b/flink-connector/flink/build.gradle.kts @@ -26,6 +26,7 @@ repositories { mavenCentral() } +var paimonVersion: String = libs.versions.paimon.get() val flinkVersion: String = libs.versions.flink.get() val flinkMajorVersion: String = flinkVersion.substringBeforeLast(".") @@ -37,15 +38,22 @@ val flinkMajorVersion: String = flinkVersion.substringBeforeLast(".") val scalaVersion: String = "2.12" val artifactName = "${rootProject.name}-flink-${flinkMajorVersion}_$scalaVersion" +tasks.compileJava { + dependsOn(":catalogs:catalog-lakehouse-paimon:runtimeJars") +} + dependencies { + implementation(project(":core")) implementation(project(":catalogs:catalog-common")) + implementation(libs.guava) compileOnly(project(":clients:client-java-runtime", configuration = "shadow")) - + compileOnly(project(":catalogs:catalog-lakehouse-paimon")) compileOnly("org.apache.flink:flink-connector-hive_$scalaVersion:$flinkVersion") compileOnly("org.apache.flink:flink-table-common:$flinkVersion") compileOnly("org.apache.flink:flink-table-api-java:$flinkVersion") + compileOnly("org.apache.paimon:paimon-flink-1.18:$paimonVersion") compileOnly(libs.hive2.exec) { artifact { @@ -75,6 +83,7 @@ dependencies { testImplementation(project(":clients:client-java")) testImplementation(project(":core")) testImplementation(project(":common")) + testImplementation(project(":catalogs:catalog-lakehouse-paimon")) testImplementation(project(":integration-test-common", "testArtifacts")) testImplementation(project(":server")) testImplementation(project(":server-common")) @@ -90,6 +99,7 @@ dependencies { testImplementation("org.apache.flink:flink-connector-hive_$scalaVersion:$flinkVersion") testImplementation("org.apache.flink:flink-table-common:$flinkVersion") testImplementation("org.apache.flink:flink-table-api-java:$flinkVersion") + testImplementation("org.apache.paimon:paimon-flink-$flinkMajorVersion:$paimonVersion") testImplementation(libs.hive2.exec) { artifact { @@ -170,6 +180,7 @@ tasks.test { } else { dependsOn(tasks.jar) dependsOn(":catalogs:catalog-hive:jar") + dependsOn(":catalogs:catalog-lakehouse-paimon:jar") } } diff --git a/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/paimon/GravitinoPaimonCatalog.java b/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/paimon/GravitinoPaimonCatalog.java new file mode 100644 index 00000000000..f403c54e69e --- /dev/null +++ b/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/paimon/GravitinoPaimonCatalog.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.gravitino.flink.connector.paimon; + +import org.apache.flink.table.catalog.AbstractCatalog; +import org.apache.gravitino.flink.connector.PartitionConverter; +import org.apache.gravitino.flink.connector.PropertiesConverter; +import org.apache.gravitino.flink.connector.catalog.BaseCatalog; + +/** + * The GravitinoPaimonCatalog class is an implementation of the BaseCatalog class that is used to + * proxy the PaimonCatalog class. + */ +public class GravitinoPaimonCatalog extends BaseCatalog { + + private AbstractCatalog paimonCatalog; + + protected GravitinoPaimonCatalog( + String catalogName, + AbstractCatalog paimonCatalog, + PropertiesConverter propertiesConverter, + PartitionConverter partitionConverter) { + super(catalogName, paimonCatalog.getDefaultDatabase(), propertiesConverter, partitionConverter); + } + + @Override + protected AbstractCatalog realCatalog() { + return paimonCatalog; + } +} diff --git a/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/paimon/GravitinoPaimonCatalogFactory.java b/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/paimon/GravitinoPaimonCatalogFactory.java new file mode 100644 index 00000000000..a56c1a2854d --- /dev/null +++ b/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/paimon/GravitinoPaimonCatalogFactory.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.gravitino.flink.connector.paimon; + +import com.google.common.collect.ImmutableSet; +import java.util.Collections; +import java.util.Set; +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.table.catalog.Catalog; +import org.apache.gravitino.flink.connector.DefaultPartitionConverter; +import org.apache.gravitino.flink.connector.PartitionConverter; +import org.apache.gravitino.flink.connector.PropertiesConverter; +import org.apache.gravitino.flink.connector.catalog.BaseCatalogFactory; +import org.apache.paimon.flink.FlinkCatalog; +import org.apache.paimon.flink.FlinkCatalogFactory; + +/** + * Factory for creating instances of {@link GravitinoPaimonCatalog}. It will be created by SPI + * discovery in Flink. + */ +public class GravitinoPaimonCatalogFactory implements BaseCatalogFactory { + + @Override + public Catalog createCatalog(Context context) { + FlinkCatalog catalog = new FlinkCatalogFactory().createCatalog(context); + return new GravitinoPaimonCatalog( + context.getName(), catalog, propertiesConverter(), partitionConverter()); + } + + @Override + public String factoryIdentifier() { + return GravitinoPaimonCatalogFactoryOptions.IDENTIFIER; + } + + @Override + public Set> requiredOptions() { + return ImmutableSet.of(GravitinoPaimonCatalogFactoryOptions.CATALOG_BACKEND); + } + + @Override + public Set> optionalOptions() { + return Collections.emptySet(); + } + + @Override + public String gravitinoCatalogProvider() { + return "lakehouse-paimon"; + } + + @Override + public org.apache.gravitino.Catalog.Type gravitinoCatalogType() { + return org.apache.gravitino.Catalog.Type.RELATIONAL; + } + + @Override + public PropertiesConverter propertiesConverter() { + return PaimonPropertiesConverter.INSTANCE; + } + + @Override + public PartitionConverter partitionConverter() { + return DefaultPartitionConverter.INSTANCE; + } +} diff --git a/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/paimon/GravitinoPaimonCatalogFactoryOptions.java b/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/paimon/GravitinoPaimonCatalogFactoryOptions.java new file mode 100644 index 00000000000..c01b89714aa --- /dev/null +++ b/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/paimon/GravitinoPaimonCatalogFactoryOptions.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.gravitino.flink.connector.paimon; + +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.ConfigOptions; + +public class GravitinoPaimonCatalogFactoryOptions { + + /** Identifier for the {@link GravitinoPaimonCatalog}. */ + public static final String IDENTIFIER = "gravitino-paimon"; + + public static ConfigOption CATALOG_BACKEND = + ConfigOptions.key("catalog.backend") + .stringType() + .defaultValue("fileSystem") + .withDescription(""); + + public static ConfigOption WAREHOUSE = + ConfigOptions.key("warehouse").stringType().noDefaultValue(); + + public static ConfigOption URI = ConfigOptions.key("uri").stringType().noDefaultValue(); + + public static ConfigOption JDBC_USER = + ConfigOptions.key("jdbc.user").stringType().noDefaultValue(); + + public static ConfigOption JDBC_PASSWORD = + ConfigOptions.key("jdbc.password").stringType().noDefaultValue(); +} diff --git a/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/paimon/PaimonPropertiesConverter.java b/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/paimon/PaimonPropertiesConverter.java new file mode 100644 index 00000000000..80285fb1619 --- /dev/null +++ b/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/paimon/PaimonPropertiesConverter.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.gravitino.flink.connector.paimon; + +import com.google.common.collect.Maps; +import java.util.Map; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.table.catalog.CommonCatalogOptions; +import org.apache.gravitino.catalog.lakehouse.paimon.PaimonCatalogBackend; +import org.apache.gravitino.catalog.lakehouse.paimon.PaimonCatalogPropertiesMetadata; +import org.apache.gravitino.catalog.lakehouse.paimon.PaimonConfig; +import org.apache.gravitino.flink.connector.PropertiesConverter; +import org.apache.paimon.options.CatalogOptions; + +public class PaimonPropertiesConverter implements PropertiesConverter { + + public static final PaimonPropertiesConverter INSTANCE = new PaimonPropertiesConverter(); + + private PaimonPropertiesConverter() {} + + @Override + public Map toGravitinoCatalogProperties(Configuration flinkConf) { + Map gravitinoCatalogProperties = flinkConf.toMap(); + String warehouse = flinkConf.get(GravitinoPaimonCatalogFactoryOptions.WAREHOUSE); + gravitinoCatalogProperties.put(PaimonConfig.CATALOG_WAREHOUSE.getKey(), warehouse); + String backendType = flinkConf.get(GravitinoPaimonCatalogFactoryOptions.CATALOG_BACKEND); + gravitinoCatalogProperties.put( + PaimonCatalogPropertiesMetadata.GRAVITINO_CATALOG_BACKEND, backendType); + if (PaimonCatalogBackend.JDBC.name().equalsIgnoreCase(backendType)) { + gravitinoCatalogProperties.put( + PaimonConfig.CATALOG_URI.getKey(), + flinkConf.get(GravitinoPaimonCatalogFactoryOptions.URI)); + gravitinoCatalogProperties.put( + PaimonConfig.CATALOG_JDBC_USER.getKey(), + flinkConf.get(GravitinoPaimonCatalogFactoryOptions.JDBC_USER)); + gravitinoCatalogProperties.put( + PaimonConfig.CATALOG_JDBC_PASSWORD.getKey(), + flinkConf.get(GravitinoPaimonCatalogFactoryOptions.JDBC_PASSWORD)); + } else if (PaimonCatalogBackend.HIVE.name().equalsIgnoreCase(backendType)) { + throw new UnsupportedOperationException( + "The Gravitino Connector does not currently support creating a Paimon Catalog that uses Hive Metastore."); + } + return gravitinoCatalogProperties; + } + + @Override + public Map toFlinkCatalogProperties(Map gravitinoProperties) { + Map flinkCatalogProperties = Maps.newHashMap(); + flinkCatalogProperties.putAll(gravitinoProperties); + String backendType = + flinkCatalogProperties.get(PaimonCatalogPropertiesMetadata.GRAVITINO_CATALOG_BACKEND); + if (PaimonCatalogBackend.JDBC.name().equalsIgnoreCase(backendType)) { + flinkCatalogProperties.put(CatalogOptions.METASTORE.key(), backendType); + flinkCatalogProperties.put( + GravitinoPaimonCatalogFactoryOptions.URI.key(), + gravitinoProperties.get(PaimonConfig.CATALOG_URI.getKey())); + flinkCatalogProperties.put( + GravitinoPaimonCatalogFactoryOptions.JDBC_USER.key(), + gravitinoProperties.get(PaimonConfig.CATALOG_JDBC_USER.getKey())); + flinkCatalogProperties.put( + GravitinoPaimonCatalogFactoryOptions.JDBC_PASSWORD.key(), + gravitinoProperties.get(PaimonConfig.CATALOG_JDBC_PASSWORD.getKey())); + } else if (PaimonCatalogBackend.HIVE.name().equalsIgnoreCase(backendType)) { + throw new UnsupportedOperationException( + "The Gravitino Connector does not currently support creating a Paimon Catalog that uses Hive Metastore."); + } + flinkCatalogProperties.put( + GravitinoPaimonCatalogFactoryOptions.CATALOG_BACKEND.key(), backendType); + flinkCatalogProperties.put( + CommonCatalogOptions.CATALOG_TYPE.key(), GravitinoPaimonCatalogFactoryOptions.IDENTIFIER); + return flinkCatalogProperties; + } +} diff --git a/flink-connector/flink/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory b/flink-connector/flink/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory index c9d9c92b5ef..a535afb6dc2 100644 --- a/flink-connector/flink/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory +++ b/flink-connector/flink/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory @@ -18,4 +18,5 @@ # org.apache.gravitino.flink.connector.store.GravitinoCatalogStoreFactory -org.apache.gravitino.flink.connector.hive.GravitinoHiveCatalogFactory \ No newline at end of file +org.apache.gravitino.flink.connector.hive.GravitinoHiveCatalogFactory +org.apache.gravitino.flink.connector.paimon.GravitinoPaimonCatalogFactory \ No newline at end of file diff --git a/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/FlinkCommonIT.java b/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/FlinkCommonIT.java index 2d022b4a8a4..3c9958ed3b6 100644 --- a/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/FlinkCommonIT.java +++ b/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/FlinkCommonIT.java @@ -53,12 +53,26 @@ import org.apache.gravitino.rel.types.Types; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.condition.EnabledIf; public abstract class FlinkCommonIT extends FlinkEnvIT { protected abstract Catalog currentCatalog(); + protected boolean supportSchemaOperation() { + return true; + } + + protected boolean supportTableOperation() { + return true; + } + + protected boolean supportColumnOperation() { + return true; + } + @Test + @EnabledIf("supportSchemaOperation") public void testCreateSchema() { doWithCatalog( currentCatalog(), @@ -76,6 +90,7 @@ public void testCreateSchema() { } @Test + @EnabledIf("supportSchemaOperation") public void testGetSchema() { doWithCatalog( currentCatalog(), @@ -110,6 +125,7 @@ public void testGetSchema() { } @Test + @EnabledIf("supportSchemaOperation") public void testListSchema() { doWithCatalog( currentCatalog(), @@ -150,6 +166,7 @@ public void testListSchema() { } @Test + @EnabledIf("supportSchemaOperation") public void testAlterSchema() { doWithCatalog( currentCatalog(), @@ -188,6 +205,7 @@ public void testAlterSchema() { } @Test + @EnabledIf("supportTableOperation") public void testCreateSimpleTable() { String databaseName = "test_create_no_partition_table_db"; String tableName = "test_create_no_partition_table"; @@ -236,6 +254,7 @@ public void testCreateSimpleTable() { } @Test + @EnabledIf("supportTableOperation") public void testListTables() { String newSchema = "test_list_table_catalog"; Column[] columns = new Column[] {Column.of("user_id", Types.IntegerType.get(), "USER_ID")}; @@ -268,6 +287,7 @@ public void testListTables() { } @Test + @EnabledIf("supportTableOperation") public void testDropTable() { String databaseName = "test_drop_table_db"; doWithSchema( @@ -289,6 +309,7 @@ public void testDropTable() { } @Test + @EnabledIf("supportTableOperation") public void testGetSimpleTable() { String databaseName = "test_get_simple_table"; Column[] columns = @@ -342,6 +363,7 @@ public void testGetSimpleTable() { } @Test + @EnabledIf("supportColumnOperation") public void testRenameColumn() { String databaseName = "test_rename_column_db"; String tableName = "test_rename_column"; @@ -377,6 +399,7 @@ public void testRenameColumn() { } @Test + @EnabledIf("supportColumnOperation") public void testAlterTableComment() { String databaseName = "test_alter_table_comment_database"; String tableName = "test_alter_table_comment"; @@ -436,6 +459,7 @@ public void testAlterTableComment() { } @Test + @EnabledIf("supportColumnOperation") public void testAlterTableAddColumn() { String databaseName = "test_alter_table_add_column_db"; String tableName = "test_alter_table_add_column"; @@ -471,6 +495,7 @@ public void testAlterTableAddColumn() { } @Test + @EnabledIf("supportColumnOperation") public void testAlterTableDropColumn() { String databaseName = "test_alter_table_drop_column_db"; String tableName = "test_alter_table_drop_column"; @@ -501,6 +526,7 @@ public void testAlterTableDropColumn() { } @Test + @EnabledIf("supportColumnOperation") public void testAlterColumnTypeAndChangeOrder() { String databaseName = "test_alter_table_alter_column_db"; String tableName = "test_alter_table_rename_column"; @@ -542,6 +568,7 @@ public void testAlterColumnTypeAndChangeOrder() { } @Test + @EnabledIf("supportTableOperation") public void testRenameTable() { String databaseName = "test_rename_table_db"; String tableName = "test_rename_table"; @@ -569,6 +596,7 @@ public void testRenameTable() { } @Test + @EnabledIf("supportTableOperation") public void testAlterTableProperties() { String databaseName = "test_alter_table_properties_db"; String tableName = "test_alter_table_properties"; diff --git a/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/paimon/FlinkPaimonCatalogIT.java b/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/paimon/FlinkPaimonCatalogIT.java new file mode 100644 index 00000000000..f06746e98ca --- /dev/null +++ b/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/paimon/FlinkPaimonCatalogIT.java @@ -0,0 +1,189 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.gravitino.flink.connector.integration.test.paimon; + +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableMap; +import java.util.Arrays; +import java.util.Map; +import org.apache.flink.table.api.ResultKind; +import org.apache.flink.table.api.TableResult; +import org.apache.flink.types.Row; +import org.apache.gravitino.Catalog; +import org.apache.gravitino.Schema; +import org.apache.gravitino.catalog.lakehouse.paimon.PaimonCatalogPropertiesMetadata; +import org.apache.gravitino.flink.connector.integration.test.FlinkCommonIT; +import org.apache.gravitino.flink.connector.integration.test.utils.TestUtils; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; + +public class FlinkPaimonCatalogIT extends FlinkCommonIT { + + private static final String DEFAULT_PAIMON_CATALOG = + "test_flink_paimon_filesystem_schema_catalog"; + + private static org.apache.gravitino.Catalog catalog; + + @Override + protected boolean supportColumnOperation() { + return false; + } + + @Override + protected boolean supportTableOperation() { + return false; + } + + @Override + protected boolean supportSchemaOperation() { + return false; + } + + protected Catalog currentCatalog() { + return catalog; + } + + @BeforeAll + static void setup() { + initPaimonCatalog(); + } + + @AfterAll + static void stop() { + Preconditions.checkNotNull(metalake); + metalake.dropCatalog(DEFAULT_PAIMON_CATALOG, true); + } + + private static void initPaimonCatalog() { + Preconditions.checkNotNull(metalake); + catalog = + metalake.createCatalog( + DEFAULT_PAIMON_CATALOG, + org.apache.gravitino.Catalog.Type.RELATIONAL, + "lakehouse-paimon", + null, + ImmutableMap.of( + PaimonCatalogPropertiesMetadata.GRAVITINO_CATALOG_BACKEND, + "filesystem", + "warehouse", + "/tmp/gravitino/paimon")); + } + + @Test + public void testCreateSchema() { + doWithCatalog( + currentCatalog(), + catalog -> { + String schema = "test_create_schema"; + try { + TableResult tableResult = sql("CREATE DATABASE IF NOT EXISTS %s", schema); + TestUtils.assertTableResult(tableResult, ResultKind.SUCCESS); + catalog.asSchemas().schemaExists(schema); + } finally { + catalog.asSchemas().dropSchema(schema, true); + Assertions.assertFalse(catalog.asSchemas().schemaExists(schema)); + } + }); + } + + @Test + public void testGetSchema() { + doWithCatalog( + currentCatalog(), + catalog -> { + String schema = "test_get_schema"; + try { + TestUtils.assertTableResult( + sql("CREATE DATABASE IF NOT EXISTS %s", schema), ResultKind.SUCCESS); + TestUtils.assertTableResult(tableEnv.executeSql("USE " + schema), ResultKind.SUCCESS); + + catalog.asSchemas().schemaExists(schema); + Schema loadedSchema = catalog.asSchemas().loadSchema(schema); + Assertions.assertEquals(schema, loadedSchema.name()); + } finally { + catalog.asSchemas().dropSchema(schema, true); + Assertions.assertFalse(catalog.asSchemas().schemaExists(schema)); + } + }); + } + + @Test + public void testListSchema() { + doWithCatalog( + currentCatalog(), + catalog -> { + String schema = "test_list_schema"; + String schema2 = "test_list_schema2"; + String schema3 = "test_list_schema3"; + + try { + TestUtils.assertTableResult( + sql("CREATE DATABASE IF NOT EXISTS %s", schema), ResultKind.SUCCESS); + TestUtils.assertTableResult( + sql("CREATE DATABASE IF NOT EXISTS %s", schema2), ResultKind.SUCCESS); + TestUtils.assertTableResult( + sql("CREATE DATABASE IF NOT EXISTS %s", schema3), ResultKind.SUCCESS); + TestUtils.assertTableResult( + sql("SHOW DATABASES"), + ResultKind.SUCCESS_WITH_CONTENT, + Row.of("default"), + Row.of(schema), + Row.of(schema2), + Row.of(schema3)); + + String[] schemas = catalog.asSchemas().listSchemas(); + Arrays.sort(schemas); + Assertions.assertEquals(4, schemas.length); + Assertions.assertEquals("default", schemas[0]); + Assertions.assertEquals(schema, schemas[1]); + Assertions.assertEquals(schema2, schemas[2]); + Assertions.assertEquals(schema3, schemas[3]); + } finally { + catalog.asSchemas().dropSchema(schema, true); + catalog.asSchemas().dropSchema(schema2, true); + catalog.asSchemas().dropSchema(schema3, true); + Assertions.assertEquals(1, catalog.asSchemas().listSchemas().length); + } + }); + } + + @Test + public void testCreateGravitinoPaimonCatalogUsingSQL() { + tableEnv.useCatalog(DEFAULT_CATALOG); + int numCatalogs = tableEnv.listCatalogs().length; + String catalogName = "gravitino_hive_sql"; + String warehouse = "/tmp/gravitino/paimon"; + tableEnv.executeSql( + String.format( + "create catalog %s with (" + + "'type'='gravitino-paimon', " + + "'warehouse'='%s'," + + "'catalog.backend'='filesystem'" + + ")", + catalogName, warehouse)); + String[] catalogs = tableEnv.listCatalogs(); + Assertions.assertEquals(numCatalogs + 1, catalogs.length, "Should create a new catalog"); + Assertions.assertTrue(metalake.catalogExists(catalogName)); + org.apache.gravitino.Catalog gravitinoCatalog = metalake.loadCatalog(catalogName); + Map properties = gravitinoCatalog.properties(); + Assertions.assertEquals(warehouse, properties.get("warehouse")); + } +} diff --git a/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/paimon/TestPaimonPropertiesConverter.java b/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/paimon/TestPaimonPropertiesConverter.java new file mode 100644 index 00000000000..5b37c5a3a97 --- /dev/null +++ b/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/paimon/TestPaimonPropertiesConverter.java @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.gravitino.flink.connector.paimon; + +import com.google.common.collect.ImmutableMap; +import java.util.Map; +import org.apache.flink.configuration.Configuration; +import org.apache.gravitino.catalog.lakehouse.paimon.PaimonCatalogPropertiesMetadata; +import org.apache.gravitino.catalog.lakehouse.paimon.PaimonConfig; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +/** Test for {@link PaimonPropertiesConverter} */ +public class TestPaimonPropertiesConverter { + + private static final PaimonPropertiesConverter CONVERTER = PaimonPropertiesConverter.INSTANCE; + + private static final String localWarehouse = "file:///tmp/paimon_warehouse"; + + @Test + public void testToPaimonFileSystemCatalog() { + Map catalogProperties = ImmutableMap.of("warehouse", localWarehouse); + Map flinkCatalogProperties = + CONVERTER.toFlinkCatalogProperties(catalogProperties); + Assertions.assertEquals( + GravitinoPaimonCatalogFactoryOptions.IDENTIFIER, flinkCatalogProperties.get("type")); + Assertions.assertEquals(localWarehouse, flinkCatalogProperties.get("warehouse")); + } + + @Test + public void testToPaimonJdbcCatalog() { + String testUser = "testUser"; + String testPassword = "testPassword"; + String testUri = "testUri"; + Map catalogProperties = + ImmutableMap.of( + PaimonConfig.CATALOG_WAREHOUSE.getKey(), + localWarehouse, + PaimonCatalogPropertiesMetadata.GRAVITINO_CATALOG_BACKEND, + "jdbc", + PaimonConfig.CATALOG_JDBC_USER.getKey(), + testUser, + PaimonConfig.CATALOG_JDBC_PASSWORD.getKey(), + testPassword, + PaimonConfig.CATALOG_URI.getKey(), + testUri); + Map flinkCatalogProperties = + CONVERTER.toFlinkCatalogProperties(catalogProperties); + Assertions.assertEquals( + GravitinoPaimonCatalogFactoryOptions.IDENTIFIER, flinkCatalogProperties.get("type")); + Assertions.assertEquals( + localWarehouse, + flinkCatalogProperties.get(GravitinoPaimonCatalogFactoryOptions.WAREHOUSE.key())); + Assertions.assertEquals( + testUser, flinkCatalogProperties.get(GravitinoPaimonCatalogFactoryOptions.JDBC_USER.key())); + Assertions.assertEquals( + testPassword, + flinkCatalogProperties.get(GravitinoPaimonCatalogFactoryOptions.JDBC_PASSWORD.key())); + Assertions.assertEquals( + "jdbc", + flinkCatalogProperties.get(GravitinoPaimonCatalogFactoryOptions.CATALOG_BACKEND.key())); + Assertions.assertEquals( + testUri, flinkCatalogProperties.get(GravitinoPaimonCatalogFactoryOptions.URI.key())); + } + + @Test + public void testToGravitinoCatalogProperties() { + String testUser = "testUser"; + String testPassword = "testPassword"; + String testUri = "testUri"; + Configuration configuration = + Configuration.fromMap( + ImmutableMap.of( + GravitinoPaimonCatalogFactoryOptions.WAREHOUSE.key(), + localWarehouse, + GravitinoPaimonCatalogFactoryOptions.CATALOG_BACKEND.key(), + "jdbc", + GravitinoPaimonCatalogFactoryOptions.JDBC_USER.key(), + testUser, + GravitinoPaimonCatalogFactoryOptions.JDBC_PASSWORD.key(), + testPassword, + GravitinoPaimonCatalogFactoryOptions.URI.key(), + testUri)); + Map properties = CONVERTER.toGravitinoCatalogProperties(configuration); + Assertions.assertEquals( + localWarehouse, properties.get(PaimonConfig.CATALOG_WAREHOUSE.getKey())); + Assertions.assertEquals(testUser, properties.get(PaimonConfig.CATALOG_JDBC_USER.getKey())); + Assertions.assertEquals( + testPassword, properties.get(PaimonConfig.CATALOG_JDBC_PASSWORD.getKey())); + Assertions.assertEquals(testUri, properties.get(PaimonConfig.CATALOG_URI.getKey())); + } +}