From 873217554c7022f191013605133c43872e8e5bf6 Mon Sep 17 00:00:00 2001 From: cai can <94670132+caican00@users.noreply.github.com> Date: Mon, 16 Dec 2024 11:36:10 +0800 Subject: [PATCH] [#4722] feat(paimon-spark-connector): support schema and table DDL and table DML for GravitinoPaimonCatalog in paimon spark connector (#5722) ### What changes were proposed in this pull request? support schema and table DDL and table DML for GravitinoPaimonCatalog in paimon spark connector. ### Why are the changes needed? Fix: https://github.com/apache/gravitino/issues/4722 https://github.com/apache/gravitino/issues/4717 ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? new Its and UTs. --------- Co-authored-by: caican --- .../lakehouse/paimon/PaimonConstants.java | 57 +++++++++ .../paimon/PaimonPropertiesUtils.java | 95 ++++++++++++++ .../PaimonCatalogPropertiesMetadata.java | 26 ++-- .../PaimonSchemaPropertiesMetadata.java | 2 +- .../paimon/PaimonTablePropertiesMetadata.java | 16 +-- .../storage/PaimonOSSFileSystemConfig.java | 7 +- .../storage/PaimonS3FileSystemConfig.java | 7 +- docs/lakehouse-paimon-catalog.md | 35 +++--- spark-connector/spark-common/build.gradle.kts | 10 ++ .../spark/connector/catalog/BaseCatalog.java | 4 +- .../paimon/GravitinoPaimonCatalog.java | 84 +++++++++++++ .../paimon/PaimonPropertiesConstants.java | 51 ++++++++ .../paimon/PaimonPropertiesConverter.java | 67 ++++++++++ .../connector/paimon/SparkPaimonTable.java | 88 +++++++++++++ .../connector/version/CatalogNameAdaptor.java | 21 +++- .../integration/test/SparkCommonIT.java | 20 +-- .../test/hive/SparkHiveCatalogIT.java | 5 + .../test/iceberg/SparkIcebergCatalogIT.java | 5 + ...SparkPaimonCatalogFilesystemBackendIT.java | 71 +++++++++++ .../test/paimon/SparkPaimonCatalogIT.java | 119 ++++++++++++++++++ .../integration/test/util/SparkTableInfo.java | 7 ++ .../integration/test/util/SparkUtilIT.java | 11 +- .../paimon/TestPaimonPropertiesConverter.java | 106 ++++++++++++++++ spark-connector/v3.3/spark/build.gradle.kts | 11 ++ .../paimon/GravitinoPaimonCatalogSpark33.java | 21 ++++ ...arkPaimonCatalogFilesystemBackendIT33.java | 35 ++++++ .../version/TestCatalogNameAdaptor.java | 4 + spark-connector/v3.4/spark/build.gradle.kts | 11 ++ .../paimon/GravitinoPaimonCatalogSpark34.java | 37 ++++++ ...arkPaimonCatalogFilesystemBackendIT34.java | 36 ++++++ .../version/TestCatalogNameAdaptor.java | 4 + spark-connector/v3.5/spark/build.gradle.kts | 11 ++ .../paimon/GravitinoPaimonCatalogSpark35.java | 21 ++++ ...arkPaimonCatalogFilesystemBackendIT35.java | 36 ++++++ .../version/TestCatalogNameAdaptor.java | 4 + 35 files changed, 1081 insertions(+), 64 deletions(-) create mode 100644 catalogs/catalog-common/src/main/java/org/apache/gravitino/catalog/lakehouse/paimon/PaimonConstants.java create mode 100644 catalogs/catalog-common/src/main/java/org/apache/gravitino/catalog/lakehouse/paimon/PaimonPropertiesUtils.java create mode 100644 spark-connector/spark-common/src/main/java/org/apache/gravitino/spark/connector/paimon/GravitinoPaimonCatalog.java create mode 100644 spark-connector/spark-common/src/main/java/org/apache/gravitino/spark/connector/paimon/PaimonPropertiesConstants.java create mode 100644 spark-connector/spark-common/src/main/java/org/apache/gravitino/spark/connector/paimon/PaimonPropertiesConverter.java create mode 100644 spark-connector/spark-common/src/main/java/org/apache/gravitino/spark/connector/paimon/SparkPaimonTable.java create mode 100644 spark-connector/spark-common/src/test/java/org/apache/gravitino/spark/connector/integration/test/paimon/SparkPaimonCatalogFilesystemBackendIT.java create mode 100644 spark-connector/spark-common/src/test/java/org/apache/gravitino/spark/connector/integration/test/paimon/SparkPaimonCatalogIT.java create mode 100644 spark-connector/spark-common/src/test/java/org/apache/gravitino/spark/connector/paimon/TestPaimonPropertiesConverter.java create mode 100644 spark-connector/v3.3/spark/src/main/java/org/apache/gravitino/spark/connector/paimon/GravitinoPaimonCatalogSpark33.java create mode 100644 spark-connector/v3.3/spark/src/test/java/org/apache/gravitino/spark/connector/integration/test/paimon/SparkPaimonCatalogFilesystemBackendIT33.java create mode 100644 spark-connector/v3.4/spark/src/main/java/org/apache/gravitino/spark/connector/paimon/GravitinoPaimonCatalogSpark34.java create mode 100644 spark-connector/v3.4/spark/src/test/java/org/apache/gravitino/spark/connector/integration/test/paimon/SparkPaimonCatalogFilesystemBackendIT34.java create mode 100644 spark-connector/v3.5/spark/src/main/java/org/apache/gravitino/spark/connector/paimon/GravitinoPaimonCatalogSpark35.java create mode 100644 spark-connector/v3.5/spark/src/test/java/org/apache/gravitino/spark/connector/integration/test/paimon/SparkPaimonCatalogFilesystemBackendIT35.java diff --git a/catalogs/catalog-common/src/main/java/org/apache/gravitino/catalog/lakehouse/paimon/PaimonConstants.java b/catalogs/catalog-common/src/main/java/org/apache/gravitino/catalog/lakehouse/paimon/PaimonConstants.java new file mode 100644 index 00000000000..291a7ea9694 --- /dev/null +++ b/catalogs/catalog-common/src/main/java/org/apache/gravitino/catalog/lakehouse/paimon/PaimonConstants.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.gravitino.catalog.lakehouse.paimon; + +public class PaimonConstants { + + // Paimon catalog properties constants + public static final String CATALOG_BACKEND = "catalog-backend"; + public static final String METASTORE = "metastore"; + public static final String URI = "uri"; + public static final String WAREHOUSE = "warehouse"; + public static final String CATALOG_BACKEND_NAME = "catalog-backend-name"; + + public static final String GRAVITINO_JDBC_USER = "jdbc-user"; + public static final String PAIMON_JDBC_USER = "jdbc.user"; + + public static final String GRAVITINO_JDBC_PASSWORD = "jdbc-password"; + public static final String PAIMON_JDBC_PASSWORD = "jdbc.password"; + + public static final String GRAVITINO_JDBC_DRIVER = "jdbc-driver"; + + // S3 properties needed by Paimon + public static final String S3_ENDPOINT = "s3.endpoint"; + public static final String S3_ACCESS_KEY = "s3.access-key"; + public static final String S3_SECRET_KEY = "s3.secret-key"; + + // OSS related properties + public static final String OSS_ENDPOINT = "fs.oss.endpoint"; + public static final String OSS_ACCESS_KEY = "fs.oss.accessKeyId"; + public static final String OSS_SECRET_KEY = "fs.oss.accessKeySecret"; + + // Iceberg Table properties constants + public static final String COMMENT = "comment"; + public static final String OWNER = "owner"; + public static final String BUCKET_KEY = "bucket-key"; + public static final String MERGE_ENGINE = "merge-engine"; + public static final String SEQUENCE_FIELD = "sequence.field"; + public static final String ROWKIND_FIELD = "rowkind.field"; + public static final String PRIMARY_KEY = "primary-key"; + public static final String PARTITION = "partition"; +} diff --git a/catalogs/catalog-common/src/main/java/org/apache/gravitino/catalog/lakehouse/paimon/PaimonPropertiesUtils.java b/catalogs/catalog-common/src/main/java/org/apache/gravitino/catalog/lakehouse/paimon/PaimonPropertiesUtils.java new file mode 100644 index 00000000000..0dcf24f3a67 --- /dev/null +++ b/catalogs/catalog-common/src/main/java/org/apache/gravitino/catalog/lakehouse/paimon/PaimonPropertiesUtils.java @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.gravitino.catalog.lakehouse.paimon; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Locale; +import java.util.Map; +import java.util.Optional; +import org.apache.gravitino.storage.OSSProperties; +import org.apache.gravitino.storage.S3Properties; + +public class PaimonPropertiesUtils { + + // Map that maintains the mapping of keys in Gravitino to that in Paimon, for example, users + // will only need to set the configuration 'catalog-backend' in Gravitino and Gravitino will + // change it to `catalogType` automatically and pass it to Paimon. + public static final Map GRAVITINO_CONFIG_TO_PAIMON; + + static { + Map map = new HashMap(); + map.put(PaimonConstants.CATALOG_BACKEND, PaimonConstants.CATALOG_BACKEND); + map.put(PaimonConstants.GRAVITINO_JDBC_DRIVER, PaimonConstants.GRAVITINO_JDBC_DRIVER); + map.put(PaimonConstants.GRAVITINO_JDBC_USER, PaimonConstants.PAIMON_JDBC_USER); + map.put(PaimonConstants.GRAVITINO_JDBC_PASSWORD, PaimonConstants.PAIMON_JDBC_PASSWORD); + map.put(PaimonConstants.URI, PaimonConstants.URI); + map.put(PaimonConstants.WAREHOUSE, PaimonConstants.WAREHOUSE); + map.put(PaimonConstants.CATALOG_BACKEND_NAME, PaimonConstants.CATALOG_BACKEND_NAME); + // S3 + map.put(S3Properties.GRAVITINO_S3_ENDPOINT, PaimonConstants.S3_ENDPOINT); + map.put(S3Properties.GRAVITINO_S3_ACCESS_KEY_ID, PaimonConstants.S3_ACCESS_KEY); + map.put(S3Properties.GRAVITINO_S3_SECRET_ACCESS_KEY, PaimonConstants.S3_SECRET_KEY); + // OSS + map.put(OSSProperties.GRAVITINO_OSS_ENDPOINT, PaimonConstants.OSS_ENDPOINT); + map.put(OSSProperties.GRAVITINO_OSS_ACCESS_KEY_ID, PaimonConstants.OSS_ACCESS_KEY); + map.put(OSSProperties.GRAVITINO_OSS_ACCESS_KEY_SECRET, PaimonConstants.OSS_SECRET_KEY); + GRAVITINO_CONFIG_TO_PAIMON = Collections.unmodifiableMap(map); + } + + /** + * Converts Gravitino properties to Paimon catalog properties, the common transform logic shared + * by Spark connector, Gravitino Paimon catalog. + * + * @param gravitinoProperties a map of Gravitino configuration properties. + * @return a map containing Paimon catalog properties. + */ + public static Map toPaimonCatalogProperties( + Map gravitinoProperties) { + Map paimonProperties = new HashMap<>(); + gravitinoProperties.forEach( + (key, value) -> { + if (GRAVITINO_CONFIG_TO_PAIMON.containsKey(key)) { + paimonProperties.put(GRAVITINO_CONFIG_TO_PAIMON.get(key), value); + } + }); + return paimonProperties; + } + + /** + * Get catalog backend name from Gravitino catalog properties. + * + * @param catalogProperties a map of Gravitino catalog properties. + * @return catalog backend name. + */ + public static String getCatalogBackendName(Map catalogProperties) { + String backendName = catalogProperties.get(PaimonConstants.CATALOG_BACKEND_NAME); + if (backendName != null) { + return backendName; + } + + String catalogBackend = catalogProperties.get(PaimonConstants.CATALOG_BACKEND); + return Optional.ofNullable(catalogBackend) + .map(s -> s.toLowerCase(Locale.ROOT)) + .orElseThrow( + () -> + new UnsupportedOperationException( + String.format("Unsupported catalog backend: %s", catalogBackend))); + } +} diff --git a/catalogs/catalog-lakehouse-paimon/src/main/java/org/apache/gravitino/catalog/lakehouse/paimon/PaimonCatalogPropertiesMetadata.java b/catalogs/catalog-lakehouse-paimon/src/main/java/org/apache/gravitino/catalog/lakehouse/paimon/PaimonCatalogPropertiesMetadata.java index e3b59bff36d..4c9dcb07a80 100644 --- a/catalogs/catalog-lakehouse-paimon/src/main/java/org/apache/gravitino/catalog/lakehouse/paimon/PaimonCatalogPropertiesMetadata.java +++ b/catalogs/catalog-lakehouse-paimon/src/main/java/org/apache/gravitino/catalog/lakehouse/paimon/PaimonCatalogPropertiesMetadata.java @@ -45,20 +45,22 @@ */ public class PaimonCatalogPropertiesMetadata extends BaseCatalogPropertiesMetadata { - @VisibleForTesting public static final String GRAVITINO_CATALOG_BACKEND = "catalog-backend"; - public static final String PAIMON_METASTORE = "metastore"; - public static final String WAREHOUSE = "warehouse"; - public static final String URI = "uri"; - public static final String GRAVITINO_JDBC_USER = "jdbc-user"; - public static final String PAIMON_JDBC_USER = "jdbc.user"; - public static final String GRAVITINO_JDBC_PASSWORD = "jdbc-password"; - public static final String PAIMON_JDBC_PASSWORD = "jdbc.password"; - public static final String GRAVITINO_JDBC_DRIVER = "jdbc-driver"; + @VisibleForTesting + public static final String GRAVITINO_CATALOG_BACKEND = PaimonConstants.CATALOG_BACKEND; + + public static final String PAIMON_METASTORE = PaimonConstants.METASTORE; + public static final String WAREHOUSE = PaimonConstants.WAREHOUSE; + public static final String URI = PaimonConstants.URI; + public static final String GRAVITINO_JDBC_USER = PaimonConstants.GRAVITINO_JDBC_USER; + public static final String PAIMON_JDBC_USER = PaimonConstants.PAIMON_JDBC_USER; + public static final String GRAVITINO_JDBC_PASSWORD = PaimonConstants.GRAVITINO_JDBC_PASSWORD; + public static final String PAIMON_JDBC_PASSWORD = PaimonConstants.PAIMON_JDBC_PASSWORD; + public static final String GRAVITINO_JDBC_DRIVER = PaimonConstants.GRAVITINO_JDBC_DRIVER; // S3 properties needed by Paimon - public static final String S3_ENDPOINT = "s3.endpoint"; - public static final String S3_ACCESS_KEY = "s3.access-key"; - public static final String S3_SECRET_KEY = "s3.secret-key"; + public static final String S3_ENDPOINT = PaimonConstants.S3_ENDPOINT; + public static final String S3_ACCESS_KEY = PaimonConstants.S3_ACCESS_KEY; + public static final String S3_SECRET_KEY = PaimonConstants.S3_SECRET_KEY; public static final Map GRAVITINO_CONFIG_TO_PAIMON = ImmutableMap.of( diff --git a/catalogs/catalog-lakehouse-paimon/src/main/java/org/apache/gravitino/catalog/lakehouse/paimon/PaimonSchemaPropertiesMetadata.java b/catalogs/catalog-lakehouse-paimon/src/main/java/org/apache/gravitino/catalog/lakehouse/paimon/PaimonSchemaPropertiesMetadata.java index 9a6ddb5a165..3da05099cc4 100644 --- a/catalogs/catalog-lakehouse-paimon/src/main/java/org/apache/gravitino/catalog/lakehouse/paimon/PaimonSchemaPropertiesMetadata.java +++ b/catalogs/catalog-lakehouse-paimon/src/main/java/org/apache/gravitino/catalog/lakehouse/paimon/PaimonSchemaPropertiesMetadata.java @@ -34,7 +34,7 @@ */ public class PaimonSchemaPropertiesMetadata extends BasePropertiesMetadata { - public static final String COMMENT = "comment"; + public static final String COMMENT = PaimonConstants.COMMENT; private static final Map> PROPERTIES_METADATA; diff --git a/catalogs/catalog-lakehouse-paimon/src/main/java/org/apache/gravitino/catalog/lakehouse/paimon/PaimonTablePropertiesMetadata.java b/catalogs/catalog-lakehouse-paimon/src/main/java/org/apache/gravitino/catalog/lakehouse/paimon/PaimonTablePropertiesMetadata.java index 671dd9d6682..ad63df6783f 100644 --- a/catalogs/catalog-lakehouse-paimon/src/main/java/org/apache/gravitino/catalog/lakehouse/paimon/PaimonTablePropertiesMetadata.java +++ b/catalogs/catalog-lakehouse-paimon/src/main/java/org/apache/gravitino/catalog/lakehouse/paimon/PaimonTablePropertiesMetadata.java @@ -35,14 +35,14 @@ */ public class PaimonTablePropertiesMetadata extends BasePropertiesMetadata { - public static final String COMMENT = "comment"; - public static final String OWNER = "owner"; - public static final String BUCKET_KEY = "bucket-key"; - public static final String MERGE_ENGINE = "merge-engine"; - public static final String SEQUENCE_FIELD = "sequence.field"; - public static final String ROWKIND_FIELD = "rowkind.field"; - public static final String PRIMARY_KEY = "primary-key"; - public static final String PARTITION = "partition"; + public static final String COMMENT = PaimonConstants.COMMENT; + public static final String OWNER = PaimonConstants.OWNER; + public static final String BUCKET_KEY = PaimonConstants.BUCKET_KEY; + public static final String MERGE_ENGINE = PaimonConstants.MERGE_ENGINE; + public static final String SEQUENCE_FIELD = PaimonConstants.SEQUENCE_FIELD; + public static final String ROWKIND_FIELD = PaimonConstants.ROWKIND_FIELD; + public static final String PRIMARY_KEY = PaimonConstants.PRIMARY_KEY; + public static final String PARTITION = PaimonConstants.PARTITION; private static final Map> PROPERTIES_METADATA; diff --git a/catalogs/catalog-lakehouse-paimon/src/main/java/org/apache/gravitino/catalog/lakehouse/paimon/storage/PaimonOSSFileSystemConfig.java b/catalogs/catalog-lakehouse-paimon/src/main/java/org/apache/gravitino/catalog/lakehouse/paimon/storage/PaimonOSSFileSystemConfig.java index ad7fa26f3bc..7b703b5b74a 100644 --- a/catalogs/catalog-lakehouse-paimon/src/main/java/org/apache/gravitino/catalog/lakehouse/paimon/storage/PaimonOSSFileSystemConfig.java +++ b/catalogs/catalog-lakehouse-paimon/src/main/java/org/apache/gravitino/catalog/lakehouse/paimon/storage/PaimonOSSFileSystemConfig.java @@ -22,6 +22,7 @@ import java.util.Map; import org.apache.commons.lang3.StringUtils; import org.apache.gravitino.Config; +import org.apache.gravitino.catalog.lakehouse.paimon.PaimonConstants; import org.apache.gravitino.config.ConfigBuilder; import org.apache.gravitino.config.ConfigConstants; import org.apache.gravitino.config.ConfigEntry; @@ -29,9 +30,9 @@ public class PaimonOSSFileSystemConfig extends Config { // OSS related properties - public static final String OSS_ENDPOINT = "fs.oss.endpoint"; - public static final String OSS_ACCESS_KEY = "fs.oss.accessKeyId"; - public static final String OSS_SECRET_KEY = "fs.oss.accessKeySecret"; + public static final String OSS_ENDPOINT = PaimonConstants.OSS_ENDPOINT; + public static final String OSS_ACCESS_KEY = PaimonConstants.OSS_ACCESS_KEY; + public static final String OSS_SECRET_KEY = PaimonConstants.OSS_SECRET_KEY; public PaimonOSSFileSystemConfig(Map properties) { super(false); diff --git a/catalogs/catalog-lakehouse-paimon/src/main/java/org/apache/gravitino/catalog/lakehouse/paimon/storage/PaimonS3FileSystemConfig.java b/catalogs/catalog-lakehouse-paimon/src/main/java/org/apache/gravitino/catalog/lakehouse/paimon/storage/PaimonS3FileSystemConfig.java index 4184fcc06f1..6588e4a5268 100644 --- a/catalogs/catalog-lakehouse-paimon/src/main/java/org/apache/gravitino/catalog/lakehouse/paimon/storage/PaimonS3FileSystemConfig.java +++ b/catalogs/catalog-lakehouse-paimon/src/main/java/org/apache/gravitino/catalog/lakehouse/paimon/storage/PaimonS3FileSystemConfig.java @@ -22,6 +22,7 @@ import java.util.Map; import org.apache.commons.lang3.StringUtils; import org.apache.gravitino.Config; +import org.apache.gravitino.catalog.lakehouse.paimon.PaimonConstants; import org.apache.gravitino.config.ConfigBuilder; import org.apache.gravitino.config.ConfigConstants; import org.apache.gravitino.config.ConfigEntry; @@ -29,9 +30,9 @@ public class PaimonS3FileSystemConfig extends Config { // S3 related properties - public static final String S3_ENDPOINT = "s3.endpoint"; - public static final String S3_ACCESS_KEY = "s3.access-key"; - public static final String S3_SECRET_KEY = "s3.secret-key"; + public static final String S3_ENDPOINT = PaimonConstants.S3_ENDPOINT; + public static final String S3_ACCESS_KEY = PaimonConstants.S3_ACCESS_KEY; + public static final String S3_SECRET_KEY = PaimonConstants.S3_SECRET_KEY; public PaimonS3FileSystemConfig(Map properties) { super(false); diff --git a/docs/lakehouse-paimon-catalog.md b/docs/lakehouse-paimon-catalog.md index d53ad482766..b67fe37db39 100644 --- a/docs/lakehouse-paimon-catalog.md +++ b/docs/lakehouse-paimon-catalog.md @@ -29,23 +29,24 @@ Builds with Apache Paimon `0.8.0`. ### Catalog properties -| Property name | Description | Default value | Required | Since Version | -|----------------------------------------------------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|---------------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------|-------------------| -| `catalog-backend` | Catalog backend of Gravitino Paimon catalog. Supports `filesystem`, `jdbc` and `hive`. | (none) | Yes | 0.6.0-incubating | -| `uri` | The URI configuration of the Paimon catalog. `thrift://127.0.0.1:9083` or `jdbc:postgresql://127.0.0.1:5432/db_name` or `jdbc:mysql://127.0.0.1:3306/metastore_db`. It is optional for `FilesystemCatalog`. | (none) | required if the value of `catalog-backend` is not `filesystem`. | 0.6.0-incubating | -| `warehouse` | Warehouse directory of catalog. `file:///user/hive/warehouse-paimon/` for local fs, `hdfs://namespace/hdfs/path` for HDFS , `s3://{bucket-name}/path/` for S3 or `oss://{bucket-name}/path` for Aliyun OSS | (none) | Yes | 0.6.0-incubating | -| `authentication.type` | The type of authentication for Paimon catalog backend, currently Gravitino only supports `Kerberos` and `simple`. | `simple` | No | 0.6.0-incubating | -| `hive.metastore.sasl.enabled` | Whether to enable SASL authentication protocol when connect to Kerberos Hive metastore. This is a raw Hive configuration | `false` | No, This value should be true in most case(Some will use SSL protocol, but it rather rare) if the value of `gravitino.iceberg-rest.authentication.type` is Kerberos. | 0.6.0-incubating | -| `authentication.kerberos.principal` | The principal of the Kerberos authentication. | (none) | required if the value of `authentication.type` is Kerberos. | 0.6.0-incubating | -| `authentication.kerberos.keytab-uri` | The URI of The keytab for the Kerberos authentication. | (none) | required if the value of `authentication.type` is Kerberos. | 0.6.0-incubating | -| `authentication.kerberos.check-interval-sec` | The check interval of Kerberos credential for Paimon catalog. | 60 | No | 0.6.0-incubating | -| `authentication.kerberos.keytab-fetch-timeout-sec` | The fetch timeout of retrieving Kerberos keytab from `authentication.kerberos.keytab-uri`. | 60 | No | 0.6.0-incubating | -| `oss-endpoint` | The endpoint of the Aliyun OSS. | (none) | required if the value of `warehouse` is a OSS path | 0.7.0-incubating | -| `oss-access-key-id` | The access key of the Aliyun OSS. | (none) | required if the value of `warehouse` is a OSS path | 0.7.0-incubating | -| `oss-accesss-key-secret` | The secret key the Aliyun OSS. | (none) | required if the value of `warehouse` is a OSS path | 0.7.0-incubating | -| `s3-endpoint` | The endpoint of the AWS S3. | (none) | required if the value of `warehouse` is a S3 path | 0.7.0-incubating | -| `s3-access-key-id` | The access key of the AWS S3. | (none) | required if the value of `warehouse` is a S3 path | 0.7.0-incubating | -| `s3-secret-access-key` | The secret key of the AWS S3. | (none) | required if the value of `warehouse` is a S3 path | 0.7.0-incubating | +| Property name | Description | Default value | Required | Since Version | +|----------------------------------------------------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|--------------------------------------------------------------------------------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------|------------------| +| `catalog-backend` | Catalog backend of Gravitino Paimon catalog. Supports `filesystem`, `jdbc` and `hive`. | (none) | Yes | 0.6.0-incubating | +| `uri` | The URI configuration of the Paimon catalog. `thrift://127.0.0.1:9083` or `jdbc:postgresql://127.0.0.1:5432/db_name` or `jdbc:mysql://127.0.0.1:3306/metastore_db`. It is optional for `FilesystemCatalog`. | (none) | required if the value of `catalog-backend` is not `filesystem`. | 0.6.0-incubating | +| `warehouse` | Warehouse directory of catalog. `file:///user/hive/warehouse-paimon/` for local fs, `hdfs://namespace/hdfs/path` for HDFS , `s3://{bucket-name}/path/` for S3 or `oss://{bucket-name}/path` for Aliyun OSS | (none) | Yes | 0.6.0-incubating | +| `catalog-backend-name` | The catalog name passed to underlying Paimon catalog backend. | The property value of `catalog-backend`, like `jdbc` for JDBC catalog backend. | No | 0.8.0-incubating | +| `authentication.type` | The type of authentication for Paimon catalog backend, currently Gravitino only supports `Kerberos` and `simple`. | `simple` | No | 0.6.0-incubating | +| `hive.metastore.sasl.enabled` | Whether to enable SASL authentication protocol when connect to Kerberos Hive metastore. This is a raw Hive configuration | `false` | No, This value should be true in most case(Some will use SSL protocol, but it rather rare) if the value of `gravitino.iceberg-rest.authentication.type` is Kerberos. | 0.6.0-incubating | +| `authentication.kerberos.principal` | The principal of the Kerberos authentication. | (none) | required if the value of `authentication.type` is Kerberos. | 0.6.0-incubating | +| `authentication.kerberos.keytab-uri` | The URI of The keytab for the Kerberos authentication. | (none) | required if the value of `authentication.type` is Kerberos. | 0.6.0-incubating | +| `authentication.kerberos.check-interval-sec` | The check interval of Kerberos credential for Paimon catalog. | 60 | No | 0.6.0-incubating | +| `authentication.kerberos.keytab-fetch-timeout-sec` | The fetch timeout of retrieving Kerberos keytab from `authentication.kerberos.keytab-uri`. | 60 | No | 0.6.0-incubating | +| `oss-endpoint` | The endpoint of the Aliyun OSS. | (none) | required if the value of `warehouse` is a OSS path | 0.7.0-incubating | +| `oss-access-key-id` | The access key of the Aliyun OSS. | (none) | required if the value of `warehouse` is a OSS path | 0.7.0-incubating | +| `oss-accesss-key-secret` | The secret key the Aliyun OSS. | (none) | required if the value of `warehouse` is a OSS path | 0.7.0-incubating | +| `s3-endpoint` | The endpoint of the AWS S3. | (none) | required if the value of `warehouse` is a S3 path | 0.7.0-incubating | +| `s3-access-key-id` | The access key of the AWS S3. | (none) | required if the value of `warehouse` is a S3 path | 0.7.0-incubating | +| `s3-secret-access-key` | The secret key of the AWS S3. | (none) | required if the value of `warehouse` is a S3 path | 0.7.0-incubating | :::note If you want to use the `oss` or `s3` warehouse, you need to place related jars in the `catalogs/lakehouse-paimon/lib` directory, more information can be found in the [Paimon S3](https://paimon.apache.org/docs/master/filesystems/s3/). diff --git a/spark-connector/spark-common/build.gradle.kts b/spark-connector/spark-common/build.gradle.kts index 7f3c66aa6e6..06e0077d21e 100644 --- a/spark-connector/spark-common/build.gradle.kts +++ b/spark-connector/spark-common/build.gradle.kts @@ -31,6 +31,7 @@ val scalaVersion: String = project.properties["scalaVersion"] as? String ?: extr val sparkVersion: String = libs.versions.spark33.get() val sparkMajorVersion: String = sparkVersion.substringBeforeLast(".") val icebergVersion: String = libs.versions.iceberg4spark.get() +val paimonVersion: String = libs.versions.paimon.get() // kyuubi hive connector for Spark 3.3 doesn't support scala 2.13 val kyuubiVersion: String = libs.versions.kyuubi4spark34.get() val scalaJava8CompatVersion: String = libs.versions.scala.java.compat.get() @@ -43,6 +44,9 @@ dependencies { compileOnly(project(":clients:client-java-runtime", configuration = "shadow")) compileOnly("org.apache.iceberg:iceberg-spark-runtime-${sparkMajorVersion}_$scalaVersion:$icebergVersion") compileOnly("org.apache.kyuubi:kyuubi-spark-connector-hive_$scalaVersion:$kyuubiVersion") + compileOnly("org.apache.paimon:paimon-spark-$sparkMajorVersion:$paimonVersion") { + exclude("org.apache.spark") + } compileOnly("org.apache.spark:spark-catalyst_$scalaVersion:$sparkVersion") compileOnly("org.apache.spark:spark-core_$scalaVersion:$sparkVersion") @@ -114,6 +118,9 @@ dependencies { testImplementation("org.apache.iceberg:iceberg-core:$icebergVersion") testImplementation("org.apache.iceberg:iceberg-hive-metastore:$icebergVersion") testImplementation("org.apache.iceberg:iceberg-spark-runtime-${sparkMajorVersion}_$scalaVersion:$icebergVersion") + testImplementation("org.apache.paimon:paimon-spark-$sparkMajorVersion:$paimonVersion") { + exclude("org.apache.spark") + } testImplementation("org.apache.kyuubi:kyuubi-spark-connector-hive_$scalaVersion:$kyuubiVersion") // include spark-sql,spark-catalyst,hive-common,hdfs-client testImplementation("org.apache.spark:spark-hive_$scalaVersion:$sparkVersion") { @@ -123,6 +130,9 @@ dependencies { exclude("org.glassfish.jersey.inject") } testImplementation("org.scala-lang.modules:scala-collection-compat_$scalaVersion:$scalaCollectionCompatVersion") + testImplementation("org.apache.spark:spark-catalyst_$scalaVersion:$sparkVersion") + testImplementation("org.apache.spark:spark-core_$scalaVersion:$sparkVersion") + testImplementation("org.apache.spark:spark-sql_$scalaVersion:$sparkVersion") testRuntimeOnly(libs.junit.jupiter.engine) } diff --git a/spark-connector/spark-common/src/main/java/org/apache/gravitino/spark/connector/catalog/BaseCatalog.java b/spark-connector/spark-common/src/main/java/org/apache/gravitino/spark/connector/catalog/BaseCatalog.java index 2201bd222be..5706895caa4 100644 --- a/spark-connector/spark-common/src/main/java/org/apache/gravitino/spark/connector/catalog/BaseCatalog.java +++ b/spark-connector/spark-common/src/main/java/org/apache/gravitino/spark/connector/catalog/BaseCatalog.java @@ -76,11 +76,11 @@ public abstract class BaseCatalog implements TableCatalog, SupportsNamespaces { protected TableCatalog sparkCatalog; protected PropertiesConverter propertiesConverter; protected SparkTransformConverter sparkTransformConverter; + // The Gravitino catalog client to do schema operations. + protected Catalog gravitinoCatalogClient; private SparkTypeConverter sparkTypeConverter; private SparkTableChangeConverter sparkTableChangeConverter; - // The Gravitino catalog client to do schema operations. - private Catalog gravitinoCatalogClient; private String catalogName; private final GravitinoCatalogManager gravitinoCatalogManager; diff --git a/spark-connector/spark-common/src/main/java/org/apache/gravitino/spark/connector/paimon/GravitinoPaimonCatalog.java b/spark-connector/spark-common/src/main/java/org/apache/gravitino/spark/connector/paimon/GravitinoPaimonCatalog.java new file mode 100644 index 00000000000..86ca680c45b --- /dev/null +++ b/spark-connector/spark-common/src/main/java/org/apache/gravitino/spark/connector/paimon/GravitinoPaimonCatalog.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.gravitino.spark.connector.paimon; + +import java.util.Map; +import org.apache.gravitino.NameIdentifier; +import org.apache.gravitino.catalog.lakehouse.paimon.PaimonPropertiesUtils; +import org.apache.gravitino.spark.connector.PropertiesConverter; +import org.apache.gravitino.spark.connector.SparkTransformConverter; +import org.apache.gravitino.spark.connector.SparkTypeConverter; +import org.apache.gravitino.spark.connector.catalog.BaseCatalog; +import org.apache.paimon.spark.SparkCatalog; +import org.apache.paimon.spark.SparkTable; +import org.apache.spark.sql.connector.catalog.Identifier; +import org.apache.spark.sql.connector.catalog.Table; +import org.apache.spark.sql.connector.catalog.TableCatalog; +import org.apache.spark.sql.util.CaseInsensitiveStringMap; + +public class GravitinoPaimonCatalog extends BaseCatalog { + + @Override + protected TableCatalog createAndInitSparkCatalog( + String name, CaseInsensitiveStringMap options, Map properties) { + String catalogBackendName = PaimonPropertiesUtils.getCatalogBackendName(properties); + TableCatalog paimonCatalog = new SparkCatalog(); + Map all = + getPropertiesConverter().toSparkCatalogProperties(options, properties); + paimonCatalog.initialize(catalogBackendName, new CaseInsensitiveStringMap(all)); + return paimonCatalog; + } + + @Override + protected Table createSparkTable( + Identifier identifier, + org.apache.gravitino.rel.Table gravitinoTable, + Table sparkTable, + TableCatalog sparkCatalog, + PropertiesConverter propertiesConverter, + SparkTransformConverter sparkTransformConverter, + SparkTypeConverter sparkTypeConverter) { + return new SparkPaimonTable( + identifier, + gravitinoTable, + (SparkTable) sparkTable, + propertiesConverter, + sparkTransformConverter, + sparkTypeConverter); + } + + @Override + protected PropertiesConverter getPropertiesConverter() { + return PaimonPropertiesConverter.getInstance(); + } + + @Override + protected SparkTransformConverter getSparkTransformConverter() { + return new SparkTransformConverter(true); + } + + @Override + public boolean dropTable(Identifier ident) { + sparkCatalog.invalidateTable(ident); + return gravitinoCatalogClient + .asTableCatalog() + .purgeTable(NameIdentifier.of(getDatabase(ident), ident.name())); + } +} diff --git a/spark-connector/spark-common/src/main/java/org/apache/gravitino/spark/connector/paimon/PaimonPropertiesConstants.java b/spark-connector/spark-common/src/main/java/org/apache/gravitino/spark/connector/paimon/PaimonPropertiesConstants.java new file mode 100644 index 00000000000..915308ae8df --- /dev/null +++ b/spark-connector/spark-common/src/main/java/org/apache/gravitino/spark/connector/paimon/PaimonPropertiesConstants.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.gravitino.spark.connector.paimon; + +import org.apache.gravitino.catalog.lakehouse.paimon.PaimonConstants; + +public class PaimonPropertiesConstants { + + public static final String GRAVITINO_PAIMON_CATALOG_BACKEND = PaimonConstants.CATALOG_BACKEND; + static final String PAIMON_CATALOG_METASTORE = PaimonConstants.METASTORE; + + public static final String GRAVITINO_PAIMON_CATALOG_WAREHOUSE = PaimonConstants.WAREHOUSE; + static final String PAIMON_CATALOG_WAREHOUSE = PaimonConstants.WAREHOUSE; + + public static final String GRAVITINO_PAIMON_CATALOG_URI = PaimonConstants.URI; + static final String PAIMON_CATALOG_URI = PaimonConstants.URI; + static final String GRAVITINO_PAIMON_CATALOG_JDBC_USER = PaimonConstants.GRAVITINO_JDBC_USER; + static final String PAIMON_CATALOG_JDBC_USER = PaimonConstants.PAIMON_JDBC_USER; + + static final String GRAVITINO_PAIMON_CATALOG_JDBC_PASSWORD = + PaimonConstants.GRAVITINO_JDBC_PASSWORD; + static final String PAIMON_CATALOG_JDBC_PASSWORD = PaimonConstants.PAIMON_JDBC_PASSWORD; + + public static final String PAIMON_CATALOG_BACKEND_HIVE = "hive"; + static final String GRAVITINO_PAIMON_CATALOG_BACKEND_HIVE = "hive"; + + static final String GRAVITINO_PAIMON_CATALOG_BACKEND_JDBC = "jdbc"; + static final String PAIMON_CATALOG_BACKEND_JDBC = "jdbc"; + + public static final String PAIMON_CATALOG_BACKEND_FILESYSTEM = "filesystem"; + static final String GRAVITINO_PAIMON_CATALOG_BACKEND_FILESYSTEM = "filesystem"; + + public static final String PAIMON_TABLE_LOCATION = "path"; +} diff --git a/spark-connector/spark-common/src/main/java/org/apache/gravitino/spark/connector/paimon/PaimonPropertiesConverter.java b/spark-connector/spark-common/src/main/java/org/apache/gravitino/spark/connector/paimon/PaimonPropertiesConverter.java new file mode 100644 index 00000000000..f713ca89ddd --- /dev/null +++ b/spark-connector/spark-common/src/main/java/org/apache/gravitino/spark/connector/paimon/PaimonPropertiesConverter.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.gravitino.spark.connector.paimon; + +import com.google.common.base.Preconditions; +import java.util.HashMap; +import java.util.Map; +import org.apache.commons.lang3.StringUtils; +import org.apache.gravitino.catalog.lakehouse.paimon.PaimonConstants; +import org.apache.gravitino.catalog.lakehouse.paimon.PaimonPropertiesUtils; +import org.apache.gravitino.spark.connector.PropertiesConverter; + +public class PaimonPropertiesConverter implements PropertiesConverter { + + public static class PaimonPropertiesConverterHolder { + private static final PaimonPropertiesConverter INSTANCE = new PaimonPropertiesConverter(); + } + + private PaimonPropertiesConverter() {} + + public static PaimonPropertiesConverter getInstance() { + return PaimonPropertiesConverter.PaimonPropertiesConverterHolder.INSTANCE; + } + + @Override + public Map toSparkCatalogProperties(Map properties) { + Preconditions.checkArgument(properties != null, "Paimon Catalog properties should not be null"); + Map all = PaimonPropertiesUtils.toPaimonCatalogProperties(properties); + String catalogBackend = all.remove(PaimonPropertiesConstants.GRAVITINO_PAIMON_CATALOG_BACKEND); + Preconditions.checkArgument( + StringUtils.isNotBlank(catalogBackend), + String.format( + "%s should not be empty", PaimonPropertiesConstants.GRAVITINO_PAIMON_CATALOG_BACKEND)); + all.put(PaimonPropertiesConstants.PAIMON_CATALOG_METASTORE, catalogBackend); + return all; + } + + @Override + public Map toGravitinoTableProperties(Map properties) { + HashMap gravitinoTableProperties = new HashMap<>(properties); + // The owner property of Paimon is a reserved property, so we need to remove it. + gravitinoTableProperties.remove(PaimonConstants.OWNER); + return gravitinoTableProperties; + } + + @Override + public Map toSparkTableProperties(Map properties) { + return new HashMap<>(properties); + } +} diff --git a/spark-connector/spark-common/src/main/java/org/apache/gravitino/spark/connector/paimon/SparkPaimonTable.java b/spark-connector/spark-common/src/main/java/org/apache/gravitino/spark/connector/paimon/SparkPaimonTable.java new file mode 100644 index 00000000000..f1db29b71bc --- /dev/null +++ b/spark-connector/spark-common/src/main/java/org/apache/gravitino/spark/connector/paimon/SparkPaimonTable.java @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.gravitino.spark.connector.paimon; + +import java.util.Map; +import org.apache.gravitino.rel.Table; +import org.apache.gravitino.spark.connector.PropertiesConverter; +import org.apache.gravitino.spark.connector.SparkTransformConverter; +import org.apache.gravitino.spark.connector.SparkTypeConverter; +import org.apache.gravitino.spark.connector.utils.GravitinoTableInfoHelper; +import org.apache.paimon.spark.SparkTable; +import org.apache.spark.sql.connector.catalog.Identifier; +import org.apache.spark.sql.connector.expressions.Transform; +import org.apache.spark.sql.connector.read.ScanBuilder; +import org.apache.spark.sql.types.StructType; +import org.apache.spark.sql.util.CaseInsensitiveStringMap; + +public class SparkPaimonTable extends SparkTable { + + private GravitinoTableInfoHelper gravitinoTableInfoHelper; + private org.apache.spark.sql.connector.catalog.Table sparkTable; + + public SparkPaimonTable( + Identifier identifier, + Table gravitinoTable, + SparkTable sparkTable, + PropertiesConverter propertiesConverter, + SparkTransformConverter sparkTransformConverter, + SparkTypeConverter sparkTypeConverter) { + super(sparkTable.getTable()); + this.gravitinoTableInfoHelper = + new GravitinoTableInfoHelper( + true, + identifier, + gravitinoTable, + propertiesConverter, + sparkTransformConverter, + sparkTypeConverter); + this.sparkTable = sparkTable; + } + + @Override + public String name() { + return gravitinoTableInfoHelper.name(); + } + + @Override + @SuppressWarnings("deprecation") + public StructType schema() { + return gravitinoTableInfoHelper.schema(); + } + + @Override + public Map properties() { + return gravitinoTableInfoHelper.properties(); + } + + @Override + public Transform[] partitioning() { + return gravitinoTableInfoHelper.partitioning(); + } + + /** + * If using SparkPaimonTable not SparkTable, we must extract snapshotId or branchName using the + * Paimon specific logic. It's hard to maintenance. + */ + @Override + public ScanBuilder newScanBuilder(CaseInsensitiveStringMap options) { + return ((SparkTable) sparkTable).newScanBuilder(options); + } +} diff --git a/spark-connector/spark-common/src/main/java/org/apache/gravitino/spark/connector/version/CatalogNameAdaptor.java b/spark-connector/spark-common/src/main/java/org/apache/gravitino/spark/connector/version/CatalogNameAdaptor.java index 8141c799bf8..9392feac2f1 100644 --- a/spark-connector/spark-common/src/main/java/org/apache/gravitino/spark/connector/version/CatalogNameAdaptor.java +++ b/spark-connector/spark-common/src/main/java/org/apache/gravitino/spark/connector/version/CatalogNameAdaptor.java @@ -27,15 +27,24 @@ public class CatalogNameAdaptor { private static final Map catalogNames = ImmutableMap.of( - "hive-3.3", "org.apache.gravitino.spark.connector.hive.GravitinoHiveCatalogSpark33", - "hive-3.4", "org.apache.gravitino.spark.connector.hive.GravitinoHiveCatalogSpark34", - "hive-3.5", "org.apache.gravitino.spark.connector.hive.GravitinoHiveCatalogSpark35", + "hive-3.3", + "org.apache.gravitino.spark.connector.hive.GravitinoHiveCatalogSpark33", + "hive-3.4", + "org.apache.gravitino.spark.connector.hive.GravitinoHiveCatalogSpark34", + "hive-3.5", + "org.apache.gravitino.spark.connector.hive.GravitinoHiveCatalogSpark35", "lakehouse-iceberg-3.3", - "org.apache.gravitino.spark.connector.iceberg.GravitinoIcebergCatalogSpark33", + "org.apache.gravitino.spark.connector.iceberg.GravitinoIcebergCatalogSpark33", "lakehouse-iceberg-3.4", - "org.apache.gravitino.spark.connector.iceberg.GravitinoIcebergCatalogSpark34", + "org.apache.gravitino.spark.connector.iceberg.GravitinoIcebergCatalogSpark34", "lakehouse-iceberg-3.5", - "org.apache.gravitino.spark.connector.iceberg.GravitinoIcebergCatalogSpark35"); + "org.apache.gravitino.spark.connector.iceberg.GravitinoIcebergCatalogSpark35", + "lakehouse-paimon-3.3", + "org.apache.gravitino.spark.connector.paimon.GravitinoPaimonCatalogSpark33", + "lakehouse-paimon-3.4", + "org.apache.gravitino.spark.connector.paimon.GravitinoPaimonCatalogSpark34", + "lakehouse-paimon-3.5", + "org.apache.gravitino.spark.connector.paimon.GravitinoPaimonCatalogSpark35"); private static String sparkVersion() { return package$.MODULE$.SPARK_VERSION(); diff --git a/spark-connector/spark-common/src/test/java/org/apache/gravitino/spark/connector/integration/test/SparkCommonIT.java b/spark-connector/spark-common/src/test/java/org/apache/gravitino/spark/connector/integration/test/SparkCommonIT.java index 63e4801ef94..c7517a3bf82 100644 --- a/spark-connector/spark-common/src/test/java/org/apache/gravitino/spark/connector/integration/test/SparkCommonIT.java +++ b/spark-connector/spark-common/src/test/java/org/apache/gravitino/spark/connector/integration/test/SparkCommonIT.java @@ -117,6 +117,8 @@ private static String getRowLevelDeleteTableSql( protected abstract boolean supportsSchemaEvolution(); + protected abstract boolean supportsReplaceColumns(); + // Use a custom database not the original default database because SparkCommonIT couldn't // read&write data to tables in default database. The main reason is default database location is // determined by `hive.metastore.warehouse.dir` in hive-site.xml which is local HDFS address @@ -146,7 +148,7 @@ void initDefaultDatabase() throws IOException { throw e; } sql("USE " + getCatalogName()); - createDatabaseIfNotExists(getDefaultDatabase()); + createDatabaseIfNotExists(getDefaultDatabase(), getProvider()); } @BeforeEach @@ -187,7 +189,7 @@ void testLoadCatalogs() { } @Test - void testCreateAndLoadSchema() { + protected void testCreateAndLoadSchema() { String testDatabaseName = "t_create1"; dropDatabaseIfExists(testDatabaseName); sql("CREATE DATABASE " + testDatabaseName + " WITH DBPROPERTIES (ID=001);"); @@ -216,7 +218,7 @@ void testCreateAndLoadSchema() { } @Test - void testAlterSchema() { + protected void testAlterSchema() { String testDatabaseName = "t_alter"; dropDatabaseIfExists(testDatabaseName); sql("CREATE DATABASE " + testDatabaseName + " WITH DBPROPERTIES (ID=001);"); @@ -240,6 +242,7 @@ void testAlterSchema() { @Test void testDropSchema() { String testDatabaseName = "t_drop"; + dropDatabaseIfExists(testDatabaseName); Set databases = getDatabases(); Assertions.assertFalse(databases.contains(testDatabaseName)); @@ -277,7 +280,7 @@ void testCreateTableWithDatabase() { // test db.table as table identifier String databaseName = "db1"; String tableName = "table1"; - createDatabaseIfNotExists(databaseName); + createDatabaseIfNotExists(databaseName, getProvider()); String tableIdentifier = String.join(".", databaseName, tableName); dropTableIfExists(tableIdentifier); @@ -291,7 +294,7 @@ void testCreateTableWithDatabase() { // use db then create table with table name databaseName = "db2"; tableName = "table2"; - createDatabaseIfNotExists(databaseName); + createDatabaseIfNotExists(databaseName, getProvider()); sql("USE " + databaseName); dropTableIfExists(tableName); @@ -379,7 +382,7 @@ void testListTable() { String database = "db_list"; String table3 = "list3"; String table4 = "list4"; - createDatabaseIfNotExists(database); + createDatabaseIfNotExists(database, getProvider()); dropTableIfExists(String.join(".", database, table3)); dropTableIfExists(String.join(".", database, table4)); createSimpleTable(String.join(".", database, table3)); @@ -550,7 +553,8 @@ void testAlterTableUpdateColumnComment() { } @Test - void testAlterTableReplaceColumns() { + @EnabledIf("supportsReplaceColumns") + protected void testAlterTableReplaceColumns() { String tableName = "test_replace_columns_table"; dropTableIfExists(tableName); @@ -563,7 +567,7 @@ void testAlterTableReplaceColumns() { sql( String.format( - "ALTER TABLE %S REPLACE COLUMNS (id int COMMENT 'new comment', name2 string, age long);", + "ALTER TABLE %s REPLACE COLUMNS (id int COMMENT 'new comment', name2 string, age long);", tableName)); ArrayList updateColumns = new ArrayList<>(); // change comment for id diff --git a/spark-connector/spark-common/src/test/java/org/apache/gravitino/spark/connector/integration/test/hive/SparkHiveCatalogIT.java b/spark-connector/spark-common/src/test/java/org/apache/gravitino/spark/connector/integration/test/hive/SparkHiveCatalogIT.java index c543d82819e..b95882a0d01 100644 --- a/spark-connector/spark-common/src/test/java/org/apache/gravitino/spark/connector/integration/test/hive/SparkHiveCatalogIT.java +++ b/spark-connector/spark-common/src/test/java/org/apache/gravitino/spark/connector/integration/test/hive/SparkHiveCatalogIT.java @@ -79,6 +79,11 @@ protected boolean supportsSchemaEvolution() { return false; } + @Override + protected boolean supportsReplaceColumns() { + return true; + } + @Test void testCreateHiveFormatPartitionTable() { String tableName = "hive_partition_table"; diff --git a/spark-connector/spark-common/src/test/java/org/apache/gravitino/spark/connector/integration/test/iceberg/SparkIcebergCatalogIT.java b/spark-connector/spark-common/src/test/java/org/apache/gravitino/spark/connector/integration/test/iceberg/SparkIcebergCatalogIT.java index 52f4abf3a98..f5fd337a13d 100644 --- a/spark-connector/spark-common/src/test/java/org/apache/gravitino/spark/connector/integration/test/iceberg/SparkIcebergCatalogIT.java +++ b/spark-connector/spark-common/src/test/java/org/apache/gravitino/spark/connector/integration/test/iceberg/SparkIcebergCatalogIT.java @@ -104,6 +104,11 @@ protected boolean supportsSchemaEvolution() { return true; } + @Override + protected boolean supportsReplaceColumns() { + return true; + } + @Override protected String getTableLocation(SparkTableInfo table) { return String.join(File.separator, table.getTableLocation(), "data"); diff --git a/spark-connector/spark-common/src/test/java/org/apache/gravitino/spark/connector/integration/test/paimon/SparkPaimonCatalogFilesystemBackendIT.java b/spark-connector/spark-common/src/test/java/org/apache/gravitino/spark/connector/integration/test/paimon/SparkPaimonCatalogFilesystemBackendIT.java new file mode 100644 index 00000000000..3d4a3257a91 --- /dev/null +++ b/spark-connector/spark-common/src/test/java/org/apache/gravitino/spark/connector/integration/test/paimon/SparkPaimonCatalogFilesystemBackendIT.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.gravitino.spark.connector.integration.test.paimon; + +import com.google.common.collect.Maps; +import java.util.Map; +import org.apache.gravitino.spark.connector.paimon.PaimonPropertiesConstants; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Tag; +import org.junit.jupiter.api.Test; + +/** This class use Apache Paimon FilesystemCatalog for backend catalog. */ +@Tag("gravitino-docker-test") +public abstract class SparkPaimonCatalogFilesystemBackendIT extends SparkPaimonCatalogIT { + + @Override + protected Map getCatalogConfigs() { + Map catalogProperties = Maps.newHashMap(); + catalogProperties.put( + PaimonPropertiesConstants.GRAVITINO_PAIMON_CATALOG_BACKEND, + PaimonPropertiesConstants.PAIMON_CATALOG_BACKEND_FILESYSTEM); + catalogProperties.put(PaimonPropertiesConstants.GRAVITINO_PAIMON_CATALOG_WAREHOUSE, warehouse); + return catalogProperties; + } + + @Test + @Override + protected void testCreateAndLoadSchema() { + String testDatabaseName = "t_create1"; + dropDatabaseIfExists(testDatabaseName); + sql("CREATE DATABASE " + testDatabaseName + " WITH DBPROPERTIES (ID=001);"); + Map databaseMeta = getDatabaseMetadata(testDatabaseName); + // The database of the Paimon filesystem backend do not store any properties. + Assertions.assertFalse(databaseMeta.containsKey("ID")); + } + + @Test + @Override + protected void testAlterSchema() { + String testDatabaseName = "t_alter"; + dropDatabaseIfExists(testDatabaseName); + sql("CREATE DATABASE " + testDatabaseName + " WITH DBPROPERTIES (ID=001);"); + Map databaseMeta = getDatabaseMetadata(testDatabaseName); + // The database of the Paimon filesystem backend do not store any properties. + Assertions.assertFalse(databaseMeta.containsKey("ID")); + + // The Paimon filesystem backend do not support alter database operation. + Assertions.assertThrows( + UnsupportedOperationException.class, + () -> + sql( + String.format( + "ALTER DATABASE %s SET DBPROPERTIES ('ID'='002')", testDatabaseName))); + } +} diff --git a/spark-connector/spark-common/src/test/java/org/apache/gravitino/spark/connector/integration/test/paimon/SparkPaimonCatalogIT.java b/spark-connector/spark-common/src/test/java/org/apache/gravitino/spark/connector/integration/test/paimon/SparkPaimonCatalogIT.java new file mode 100644 index 00000000000..c77a4642eec --- /dev/null +++ b/spark-connector/spark-common/src/test/java/org/apache/gravitino/spark/connector/integration/test/paimon/SparkPaimonCatalogIT.java @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.gravitino.spark.connector.integration.test.paimon; + +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import org.apache.gravitino.spark.connector.integration.test.SparkCommonIT; +import org.apache.gravitino.spark.connector.integration.test.util.SparkTableInfo; +import org.apache.gravitino.spark.connector.integration.test.util.SparkTableInfoChecker; +import org.apache.gravitino.spark.connector.paimon.PaimonPropertiesConstants; +import org.apache.hadoop.fs.Path; +import org.apache.spark.sql.types.DataTypes; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +public abstract class SparkPaimonCatalogIT extends SparkCommonIT { + + @Override + protected String getCatalogName() { + return "paimon"; + } + + @Override + protected String getProvider() { + return "lakehouse-paimon"; + } + + @Override + protected boolean supportsSparkSQLClusteredBy() { + return false; + } + + @Override + protected boolean supportsPartition() { + return true; + } + + @Override + protected boolean supportsDelete() { + return false; + } + + @Override + protected boolean supportsSchemaEvolution() { + return true; + } + + @Override + protected boolean supportsReplaceColumns() { + // Paimon doesn't support replace columns, because it doesn't support drop all fields in table. + // And `ALTER TABLE REPLACE COLUMNS` statement will remove all existing columns at first and + // then adds the new set of columns. + return false; + } + + @Override + protected String getTableLocation(SparkTableInfo table) { + Map tableProperties = table.getTableProperties(); + return tableProperties.get(PaimonPropertiesConstants.PAIMON_TABLE_LOCATION); + } + + @Test + void testPaimonPartitions() { + String partitionPathString = "name=a/address=beijing"; + + String tableName = "test_paimon_partition_table"; + dropTableIfExists(tableName); + String createTableSQL = getCreatePaimonSimpleTableString(tableName); + createTableSQL = createTableSQL + " PARTITIONED BY (name, address);"; + sql(createTableSQL); + SparkTableInfo tableInfo = getTableInfo(tableName); + SparkTableInfoChecker checker = + SparkTableInfoChecker.create() + .withName(tableName) + .withColumns(getPaimonSimpleTableColumn()) + .withIdentifyPartition(Collections.singletonList("name")) + .withIdentifyPartition(Collections.singletonList("address")); + checker.check(tableInfo); + + String insertData = String.format("INSERT into %s values(2,'a','beijing');", tableName); + sql(insertData); + List queryResult = getTableData(tableName); + Assertions.assertEquals(1, queryResult.size()); + Assertions.assertEquals("2,a,beijing", queryResult.get(0)); + Path partitionPath = new Path(getTableLocation(tableInfo), partitionPathString); + checkDirExists(partitionPath); + } + + private String getCreatePaimonSimpleTableString(String tableName) { + return String.format( + "CREATE TABLE %s (id INT COMMENT 'id comment', name STRING COMMENT '', address STRING COMMENT '') USING paimon", + tableName); + } + + private List getPaimonSimpleTableColumn() { + return Arrays.asList( + SparkTableInfo.SparkColumnInfo.of("id", DataTypes.IntegerType, "id comment"), + SparkTableInfo.SparkColumnInfo.of("name", DataTypes.StringType, ""), + SparkTableInfo.SparkColumnInfo.of("address", DataTypes.StringType, "")); + } +} diff --git a/spark-connector/spark-common/src/test/java/org/apache/gravitino/spark/connector/integration/test/util/SparkTableInfo.java b/spark-connector/spark-common/src/test/java/org/apache/gravitino/spark/connector/integration/test/util/SparkTableInfo.java index 38b21ddf057..077936c29c5 100644 --- a/spark-connector/spark-common/src/test/java/org/apache/gravitino/spark/connector/integration/test/util/SparkTableInfo.java +++ b/spark-connector/spark-common/src/test/java/org/apache/gravitino/spark/connector/integration/test/util/SparkTableInfo.java @@ -31,6 +31,7 @@ import org.apache.gravitino.spark.connector.ConnectorConstants; import org.apache.gravitino.spark.connector.hive.SparkHiveTable; import org.apache.gravitino.spark.connector.iceberg.SparkIcebergTable; +import org.apache.gravitino.spark.connector.paimon.SparkPaimonTable; import org.apache.spark.sql.connector.catalog.SupportsMetadataColumns; import org.apache.spark.sql.connector.catalog.Table; import org.apache.spark.sql.connector.catalog.TableCatalog; @@ -71,6 +72,10 @@ public String getTableLocation() { return tableProperties.get(TableCatalog.PROP_LOCATION); } + public Map getTableProperties() { + return tableProperties; + } + // Include database name and table name public String getTableIdentifier() { if (StringUtils.isNotBlank(database)) { @@ -186,6 +191,8 @@ private static StructType getSchema(Table baseTable) { return ((SparkHiveTable) baseTable).schema(); } else if (baseTable instanceof SparkIcebergTable) { return ((SparkIcebergTable) baseTable).schema(); + } else if (baseTable instanceof SparkPaimonTable) { + return ((SparkPaimonTable) baseTable).schema(); } else { throw new IllegalArgumentException( "Doesn't support Spark table: " + baseTable.getClass().getName()); diff --git a/spark-connector/spark-common/src/test/java/org/apache/gravitino/spark/connector/integration/test/util/SparkUtilIT.java b/spark-connector/spark-common/src/test/java/org/apache/gravitino/spark/connector/integration/test/util/SparkUtilIT.java index 646f414841b..ed7d2085ffd 100644 --- a/spark-connector/spark-common/src/test/java/org/apache/gravitino/spark/connector/integration/test/util/SparkUtilIT.java +++ b/spark-connector/spark-common/src/test/java/org/apache/gravitino/spark/connector/integration/test/util/SparkUtilIT.java @@ -74,10 +74,13 @@ protected void dropDatabaseIfExists(String database) { // Specify Location explicitly because the default location is local HDFS, Spark will expand the // location to HDFS. - protected void createDatabaseIfNotExists(String database) { - sql( - String.format( - "CREATE DATABASE IF NOT EXISTS %s LOCATION '/user/hive/%s'", database, database)); + // However, Paimon does not support create a database with a specified location. + protected void createDatabaseIfNotExists(String database, String provider) { + String locationClause = + "lakehouse-paimon".equalsIgnoreCase(provider) + ? "" + : String.format("LOCATION '/user/hive/%s'", database); + sql(String.format("CREATE DATABASE IF NOT EXISTS %s %s", database, locationClause)); } protected Map getDatabaseMetadata(String database) { diff --git a/spark-connector/spark-common/src/test/java/org/apache/gravitino/spark/connector/paimon/TestPaimonPropertiesConverter.java b/spark-connector/spark-common/src/test/java/org/apache/gravitino/spark/connector/paimon/TestPaimonPropertiesConverter.java new file mode 100644 index 00000000000..a3a0e91284a --- /dev/null +++ b/spark-connector/spark-common/src/test/java/org/apache/gravitino/spark/connector/paimon/TestPaimonPropertiesConverter.java @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.gravitino.spark.connector.paimon; + +import com.google.common.collect.ImmutableMap; +import java.util.Map; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +public class TestPaimonPropertiesConverter { + private final PaimonPropertiesConverter paimonPropertiesConverter = + PaimonPropertiesConverter.getInstance(); + + @Test + void testCatalogPropertiesWithHiveBackend() { + Map properties = + paimonPropertiesConverter.toSparkCatalogProperties( + ImmutableMap.of( + PaimonPropertiesConstants.GRAVITINO_PAIMON_CATALOG_BACKEND, + PaimonPropertiesConstants.GRAVITINO_PAIMON_CATALOG_BACKEND_HIVE, + PaimonPropertiesConstants.GRAVITINO_PAIMON_CATALOG_URI, + "hive-uri", + PaimonPropertiesConstants.GRAVITINO_PAIMON_CATALOG_WAREHOUSE, + "hive-warehouse", + "key1", + "value1")); + Assertions.assertEquals( + ImmutableMap.of( + PaimonPropertiesConstants.PAIMON_CATALOG_METASTORE, + PaimonPropertiesConstants.PAIMON_CATALOG_BACKEND_HIVE, + PaimonPropertiesConstants.PAIMON_CATALOG_URI, + "hive-uri", + PaimonPropertiesConstants.PAIMON_CATALOG_WAREHOUSE, + "hive-warehouse"), + properties); + } + + @Test + void testCatalogPropertiesWithJdbcBackend() { + Map properties = + paimonPropertiesConverter.toSparkCatalogProperties( + ImmutableMap.of( + PaimonPropertiesConstants.GRAVITINO_PAIMON_CATALOG_BACKEND, + PaimonPropertiesConstants.PAIMON_CATALOG_BACKEND_JDBC, + PaimonPropertiesConstants.GRAVITINO_PAIMON_CATALOG_URI, + "jdbc-uri", + PaimonPropertiesConstants.GRAVITINO_PAIMON_CATALOG_WAREHOUSE, + "jdbc-warehouse", + PaimonPropertiesConstants.GRAVITINO_PAIMON_CATALOG_JDBC_USER, + "user", + PaimonPropertiesConstants.GRAVITINO_PAIMON_CATALOG_JDBC_PASSWORD, + "passwd", + "key1", + "value1")); + Assertions.assertEquals( + ImmutableMap.of( + PaimonPropertiesConstants.PAIMON_CATALOG_METASTORE, + PaimonPropertiesConstants.PAIMON_CATALOG_BACKEND_JDBC, + PaimonPropertiesConstants.PAIMON_CATALOG_URI, + "jdbc-uri", + PaimonPropertiesConstants.PAIMON_CATALOG_WAREHOUSE, + "jdbc-warehouse", + PaimonPropertiesConstants.PAIMON_CATALOG_JDBC_USER, + "user", + PaimonPropertiesConstants.PAIMON_CATALOG_JDBC_PASSWORD, + "passwd"), + properties); + } + + @Test + void testCatalogPropertiesWithFilesystemBackend() { + Map properties = + paimonPropertiesConverter.toSparkCatalogProperties( + ImmutableMap.of( + PaimonPropertiesConstants.GRAVITINO_PAIMON_CATALOG_BACKEND, + PaimonPropertiesConstants.GRAVITINO_PAIMON_CATALOG_BACKEND_FILESYSTEM, + PaimonPropertiesConstants.GRAVITINO_PAIMON_CATALOG_WAREHOUSE, + "filesystem-warehouse", + "key1", + "value1")); + Assertions.assertEquals( + ImmutableMap.of( + PaimonPropertiesConstants.PAIMON_CATALOG_METASTORE, + PaimonPropertiesConstants.PAIMON_CATALOG_BACKEND_FILESYSTEM, + PaimonPropertiesConstants.PAIMON_CATALOG_WAREHOUSE, + "filesystem-warehouse"), + properties); + } +} diff --git a/spark-connector/v3.3/spark/build.gradle.kts b/spark-connector/v3.3/spark/build.gradle.kts index c4c417d62ef..66c65f863b9 100644 --- a/spark-connector/v3.3/spark/build.gradle.kts +++ b/spark-connector/v3.3/spark/build.gradle.kts @@ -31,6 +31,7 @@ val scalaVersion: String = project.properties["scalaVersion"] as? String ?: extr val sparkVersion: String = libs.versions.spark33.get() val sparkMajorVersion: String = sparkVersion.substringBeforeLast(".") val icebergVersion: String = libs.versions.iceberg4spark.get() +val paimonVersion: String = libs.versions.paimon.get() val kyuubiVersion: String = libs.versions.kyuubi4spark33.get() val scalaJava8CompatVersion: String = libs.versions.scala.java.compat.get() val scalaCollectionCompatVersion: String = libs.versions.scala.collection.compat.get() @@ -43,6 +44,9 @@ dependencies { exclude("com.fasterxml.jackson") } compileOnly("org.apache.iceberg:iceberg-spark-runtime-${sparkMajorVersion}_$scalaVersion:$icebergVersion") + compileOnly("org.apache.paimon:paimon-spark-$sparkMajorVersion:$paimonVersion") { + exclude("org.apache.spark") + } testImplementation(project(":api")) { exclude("org.apache.logging.log4j") @@ -122,6 +126,9 @@ dependencies { testImplementation("org.apache.iceberg:iceberg-core:$icebergVersion") testImplementation("org.apache.iceberg:iceberg-spark-runtime-${sparkMajorVersion}_$scalaVersion:$icebergVersion") testImplementation("org.apache.iceberg:iceberg-hive-metastore:$icebergVersion") + testImplementation("org.apache.paimon:paimon-spark-$sparkMajorVersion:$paimonVersion") { + exclude("org.apache.spark") + } testImplementation("org.apache.kyuubi:kyuubi-spark-connector-hive_$scalaVersion:$kyuubiVersion") // include spark-sql,spark-catalyst,hive-common,hdfs-client testImplementation("org.apache.spark:spark-hive_$scalaVersion:$sparkVersion") { @@ -134,6 +141,9 @@ dependencies { exclude("com.fasterxml.jackson.core") } testImplementation("org.scala-lang.modules:scala-collection-compat_$scalaVersion:$scalaCollectionCompatVersion") + testImplementation("org.apache.spark:spark-catalyst_$scalaVersion:$sparkVersion") + testImplementation("org.apache.spark:spark-core_$scalaVersion:$sparkVersion") + testImplementation("org.apache.spark:spark-sql_$scalaVersion:$sparkVersion") testRuntimeOnly(libs.junit.jupiter.engine) } @@ -152,6 +162,7 @@ tasks.test { dependsOn(":catalogs:catalog-lakehouse-iceberg:jar") dependsOn(":catalogs:catalog-hive:jar") dependsOn(":iceberg:iceberg-rest-server:jar") + dependsOn(":catalogs:catalog-lakehouse-paimon:jar") } } diff --git a/spark-connector/v3.3/spark/src/main/java/org/apache/gravitino/spark/connector/paimon/GravitinoPaimonCatalogSpark33.java b/spark-connector/v3.3/spark/src/main/java/org/apache/gravitino/spark/connector/paimon/GravitinoPaimonCatalogSpark33.java new file mode 100644 index 00000000000..2fef911a8bd --- /dev/null +++ b/spark-connector/v3.3/spark/src/main/java/org/apache/gravitino/spark/connector/paimon/GravitinoPaimonCatalogSpark33.java @@ -0,0 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.gravitino.spark.connector.paimon; + +public class GravitinoPaimonCatalogSpark33 extends GravitinoPaimonCatalog {} diff --git a/spark-connector/v3.3/spark/src/test/java/org/apache/gravitino/spark/connector/integration/test/paimon/SparkPaimonCatalogFilesystemBackendIT33.java b/spark-connector/v3.3/spark/src/test/java/org/apache/gravitino/spark/connector/integration/test/paimon/SparkPaimonCatalogFilesystemBackendIT33.java new file mode 100644 index 00000000000..839b959c777 --- /dev/null +++ b/spark-connector/v3.3/spark/src/test/java/org/apache/gravitino/spark/connector/integration/test/paimon/SparkPaimonCatalogFilesystemBackendIT33.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.gravitino.spark.connector.integration.test.paimon; + +import org.apache.gravitino.spark.connector.paimon.GravitinoPaimonCatalogSpark33; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +public class SparkPaimonCatalogFilesystemBackendIT33 extends SparkPaimonCatalogFilesystemBackendIT { + @Test + void testCatalogClassName() { + String catalogClass = + getSparkSession() + .sessionState() + .conf() + .getConfString("spark.sql.catalog." + getCatalogName()); + Assertions.assertEquals(GravitinoPaimonCatalogSpark33.class.getName(), catalogClass); + } +} diff --git a/spark-connector/v3.3/spark/src/test/java/org/apache/gravitino/spark/connector/version/TestCatalogNameAdaptor.java b/spark-connector/v3.3/spark/src/test/java/org/apache/gravitino/spark/connector/version/TestCatalogNameAdaptor.java index 1b0af02f87b..37c95e47890 100644 --- a/spark-connector/v3.3/spark/src/test/java/org/apache/gravitino/spark/connector/version/TestCatalogNameAdaptor.java +++ b/spark-connector/v3.3/spark/src/test/java/org/apache/gravitino/spark/connector/version/TestCatalogNameAdaptor.java @@ -20,6 +20,7 @@ import org.apache.gravitino.spark.connector.hive.GravitinoHiveCatalogSpark33; import org.apache.gravitino.spark.connector.iceberg.GravitinoIcebergCatalogSpark33; +import org.apache.gravitino.spark.connector.paimon.GravitinoPaimonCatalogSpark33; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; @@ -31,5 +32,8 @@ void testSpark33() { String icebergCatalogName = CatalogNameAdaptor.getCatalogName("lakehouse-iceberg"); Assertions.assertEquals(GravitinoIcebergCatalogSpark33.class.getName(), icebergCatalogName); + + String paimonCatalogName = CatalogNameAdaptor.getCatalogName("lakehouse-paimon"); + Assertions.assertEquals(GravitinoPaimonCatalogSpark33.class.getName(), paimonCatalogName); } } diff --git a/spark-connector/v3.4/spark/build.gradle.kts b/spark-connector/v3.4/spark/build.gradle.kts index f3308fca34b..aa4134a3c71 100644 --- a/spark-connector/v3.4/spark/build.gradle.kts +++ b/spark-connector/v3.4/spark/build.gradle.kts @@ -31,6 +31,7 @@ val scalaVersion: String = project.properties["scalaVersion"] as? String ?: extr val sparkVersion: String = libs.versions.spark34.get() val sparkMajorVersion: String = sparkVersion.substringBeforeLast(".") val icebergVersion: String = libs.versions.iceberg4spark.get() +val paimonVersion: String = libs.versions.paimon.get() val kyuubiVersion: String = libs.versions.kyuubi4spark34.get() val scalaJava8CompatVersion: String = libs.versions.scala.java.compat.get() val scalaCollectionCompatVersion: String = libs.versions.scala.collection.compat.get() @@ -44,6 +45,9 @@ dependencies { } compileOnly(project(":clients:client-java-runtime", configuration = "shadow")) compileOnly("org.apache.iceberg:iceberg-spark-runtime-${sparkMajorVersion}_$scalaVersion:$icebergVersion") + compileOnly("org.apache.paimon:paimon-spark-$sparkMajorVersion:$paimonVersion") { + exclude("org.apache.spark") + } testImplementation(project(":api")) { exclude("org.apache.logging.log4j") @@ -122,6 +126,9 @@ dependencies { testImplementation("org.apache.iceberg:iceberg-core:$icebergVersion") testImplementation("org.apache.iceberg:iceberg-spark-runtime-${sparkMajorVersion}_$scalaVersion:$icebergVersion") testImplementation("org.apache.iceberg:iceberg-hive-metastore:$icebergVersion") + testImplementation("org.apache.paimon:paimon-spark-$sparkMajorVersion:$paimonVersion") { + exclude("org.apache.spark") + } testImplementation("org.apache.kyuubi:kyuubi-spark-connector-hive_$scalaVersion:$kyuubiVersion") // include spark-sql,spark-catalyst,hive-common,hdfs-client testImplementation("org.apache.spark:spark-hive_$scalaVersion:$sparkVersion") { @@ -134,6 +141,9 @@ dependencies { exclude("com.fasterxml.jackson.core") } testImplementation("org.scala-lang.modules:scala-collection-compat_$scalaVersion:$scalaCollectionCompatVersion") + testImplementation("org.apache.spark:spark-catalyst_$scalaVersion:$sparkVersion") + testImplementation("org.apache.spark:spark-core_$scalaVersion:$sparkVersion") + testImplementation("org.apache.spark:spark-sql_$scalaVersion:$sparkVersion") testRuntimeOnly(libs.junit.jupiter.engine) } @@ -152,6 +162,7 @@ tasks.test { dependsOn(":catalogs:catalog-lakehouse-iceberg:jar") dependsOn(":catalogs:catalog-hive:jar") dependsOn(":iceberg:iceberg-rest-server:jar") + dependsOn(":catalogs:catalog-lakehouse-paimon:jar") } } diff --git a/spark-connector/v3.4/spark/src/main/java/org/apache/gravitino/spark/connector/paimon/GravitinoPaimonCatalogSpark34.java b/spark-connector/v3.4/spark/src/main/java/org/apache/gravitino/spark/connector/paimon/GravitinoPaimonCatalogSpark34.java new file mode 100644 index 00000000000..eb3e8779369 --- /dev/null +++ b/spark-connector/v3.4/spark/src/main/java/org/apache/gravitino/spark/connector/paimon/GravitinoPaimonCatalogSpark34.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.gravitino.spark.connector.paimon; + +import org.apache.gravitino.spark.connector.SparkTableChangeConverter; +import org.apache.gravitino.spark.connector.SparkTableChangeConverter34; +import org.apache.gravitino.spark.connector.SparkTypeConverter; +import org.apache.gravitino.spark.connector.SparkTypeConverter34; + +public class GravitinoPaimonCatalogSpark34 extends GravitinoPaimonCatalog { + @Override + protected SparkTypeConverter getSparkTypeConverter() { + return new SparkTypeConverter34(); + } + + @Override + protected SparkTableChangeConverter getSparkTableChangeConverter( + SparkTypeConverter sparkTypeConverter) { + return new SparkTableChangeConverter34(sparkTypeConverter); + } +} diff --git a/spark-connector/v3.4/spark/src/test/java/org/apache/gravitino/spark/connector/integration/test/paimon/SparkPaimonCatalogFilesystemBackendIT34.java b/spark-connector/v3.4/spark/src/test/java/org/apache/gravitino/spark/connector/integration/test/paimon/SparkPaimonCatalogFilesystemBackendIT34.java new file mode 100644 index 00000000000..d230707325c --- /dev/null +++ b/spark-connector/v3.4/spark/src/test/java/org/apache/gravitino/spark/connector/integration/test/paimon/SparkPaimonCatalogFilesystemBackendIT34.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.gravitino.spark.connector.integration.test.paimon; + +import org.apache.gravitino.spark.connector.paimon.GravitinoPaimonCatalogSpark34; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +public class SparkPaimonCatalogFilesystemBackendIT34 extends SparkPaimonCatalogFilesystemBackendIT { + + @Test + void testCatalogClassName() { + String catalogClass = + getSparkSession() + .sessionState() + .conf() + .getConfString("spark.sql.catalog." + getCatalogName()); + Assertions.assertEquals(GravitinoPaimonCatalogSpark34.class.getName(), catalogClass); + } +} diff --git a/spark-connector/v3.4/spark/src/test/java/org/apache/gravitino/spark/connector/version/TestCatalogNameAdaptor.java b/spark-connector/v3.4/spark/src/test/java/org/apache/gravitino/spark/connector/version/TestCatalogNameAdaptor.java index a2e95c8ea30..af9e67fab88 100644 --- a/spark-connector/v3.4/spark/src/test/java/org/apache/gravitino/spark/connector/version/TestCatalogNameAdaptor.java +++ b/spark-connector/v3.4/spark/src/test/java/org/apache/gravitino/spark/connector/version/TestCatalogNameAdaptor.java @@ -20,6 +20,7 @@ import org.apache.gravitino.spark.connector.hive.GravitinoHiveCatalogSpark34; import org.apache.gravitino.spark.connector.iceberg.GravitinoIcebergCatalogSpark34; +import org.apache.gravitino.spark.connector.paimon.GravitinoPaimonCatalogSpark34; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; @@ -31,5 +32,8 @@ void testSpark34() { String icebergCatalogName = CatalogNameAdaptor.getCatalogName("lakehouse-iceberg"); Assertions.assertEquals(GravitinoIcebergCatalogSpark34.class.getName(), icebergCatalogName); + + String paimonCatalogName = CatalogNameAdaptor.getCatalogName("lakehouse-paimon"); + Assertions.assertEquals(GravitinoPaimonCatalogSpark34.class.getName(), paimonCatalogName); } } diff --git a/spark-connector/v3.5/spark/build.gradle.kts b/spark-connector/v3.5/spark/build.gradle.kts index 7b8cc8447b7..15aa018081d 100644 --- a/spark-connector/v3.5/spark/build.gradle.kts +++ b/spark-connector/v3.5/spark/build.gradle.kts @@ -31,6 +31,7 @@ val scalaVersion: String = project.properties["scalaVersion"] as? String ?: extr val sparkVersion: String = libs.versions.spark35.get() val sparkMajorVersion: String = sparkVersion.substringBeforeLast(".") val icebergVersion: String = libs.versions.iceberg4spark.get() +val paimonVersion: String = libs.versions.paimon.get() val kyuubiVersion: String = libs.versions.kyuubi4spark35.get() val scalaJava8CompatVersion: String = libs.versions.scala.java.compat.get() val scalaCollectionCompatVersion: String = libs.versions.scala.collection.compat.get() @@ -45,6 +46,9 @@ dependencies { } compileOnly(project(":clients:client-java-runtime", configuration = "shadow")) compileOnly("org.apache.iceberg:iceberg-spark-runtime-${sparkMajorVersion}_$scalaVersion:$icebergVersion") + compileOnly("org.apache.paimon:paimon-spark-$sparkMajorVersion:$paimonVersion") { + exclude("org.apache.spark") + } testImplementation(project(":api")) { exclude("org.apache.logging.log4j") @@ -124,6 +128,9 @@ dependencies { testImplementation("org.apache.iceberg:iceberg-core:$icebergVersion") testImplementation("org.apache.iceberg:iceberg-spark-runtime-${sparkMajorVersion}_$scalaVersion:$icebergVersion") testImplementation("org.apache.iceberg:iceberg-hive-metastore:$icebergVersion") + testImplementation("org.apache.paimon:paimon-spark-$sparkMajorVersion:$paimonVersion") { + exclude("org.apache.spark") + } testImplementation("org.apache.kyuubi:kyuubi-spark-connector-hive_$scalaVersion:$kyuubiVersion") // include spark-sql,spark-catalyst,hive-common,hdfs-client testImplementation("org.apache.spark:spark-hive_$scalaVersion:$sparkVersion") { @@ -136,6 +143,9 @@ dependencies { exclude("com.fasterxml.jackson.core") } testImplementation("org.scala-lang.modules:scala-collection-compat_$scalaVersion:$scalaCollectionCompatVersion") + testImplementation("org.apache.spark:spark-catalyst_$scalaVersion:$sparkVersion") + testImplementation("org.apache.spark:spark-core_$scalaVersion:$sparkVersion") + testImplementation("org.apache.spark:spark-sql_$scalaVersion:$sparkVersion") testRuntimeOnly(libs.junit.jupiter.engine) } @@ -154,6 +164,7 @@ tasks.test { dependsOn(":catalogs:catalog-lakehouse-iceberg:jar") dependsOn(":catalogs:catalog-hive:jar") dependsOn(":iceberg:iceberg-rest-server:jar") + dependsOn(":catalogs:catalog-lakehouse-paimon:jar") } } diff --git a/spark-connector/v3.5/spark/src/main/java/org/apache/gravitino/spark/connector/paimon/GravitinoPaimonCatalogSpark35.java b/spark-connector/v3.5/spark/src/main/java/org/apache/gravitino/spark/connector/paimon/GravitinoPaimonCatalogSpark35.java new file mode 100644 index 00000000000..2c39af5b2f7 --- /dev/null +++ b/spark-connector/v3.5/spark/src/main/java/org/apache/gravitino/spark/connector/paimon/GravitinoPaimonCatalogSpark35.java @@ -0,0 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.gravitino.spark.connector.paimon; + +public class GravitinoPaimonCatalogSpark35 extends GravitinoPaimonCatalogSpark34 {} diff --git a/spark-connector/v3.5/spark/src/test/java/org/apache/gravitino/spark/connector/integration/test/paimon/SparkPaimonCatalogFilesystemBackendIT35.java b/spark-connector/v3.5/spark/src/test/java/org/apache/gravitino/spark/connector/integration/test/paimon/SparkPaimonCatalogFilesystemBackendIT35.java new file mode 100644 index 00000000000..44281c76ef0 --- /dev/null +++ b/spark-connector/v3.5/spark/src/test/java/org/apache/gravitino/spark/connector/integration/test/paimon/SparkPaimonCatalogFilesystemBackendIT35.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.gravitino.spark.connector.integration.test.paimon; + +import org.apache.gravitino.spark.connector.paimon.GravitinoPaimonCatalogSpark35; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +public class SparkPaimonCatalogFilesystemBackendIT35 extends SparkPaimonCatalogFilesystemBackendIT { + + @Test + void testCatalogClassName() { + String catalogClass = + getSparkSession() + .sessionState() + .conf() + .getConfString("spark.sql.catalog." + getCatalogName()); + Assertions.assertEquals(GravitinoPaimonCatalogSpark35.class.getName(), catalogClass); + } +} diff --git a/spark-connector/v3.5/spark/src/test/java/org/apache/gravitino/spark/connector/version/TestCatalogNameAdaptor.java b/spark-connector/v3.5/spark/src/test/java/org/apache/gravitino/spark/connector/version/TestCatalogNameAdaptor.java index 5295e82fb24..f02584cd616 100644 --- a/spark-connector/v3.5/spark/src/test/java/org/apache/gravitino/spark/connector/version/TestCatalogNameAdaptor.java +++ b/spark-connector/v3.5/spark/src/test/java/org/apache/gravitino/spark/connector/version/TestCatalogNameAdaptor.java @@ -20,6 +20,7 @@ import org.apache.gravitino.spark.connector.hive.GravitinoHiveCatalogSpark35; import org.apache.gravitino.spark.connector.iceberg.GravitinoIcebergCatalogSpark35; +import org.apache.gravitino.spark.connector.paimon.GravitinoPaimonCatalogSpark35; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; @@ -31,5 +32,8 @@ void testSpark35() { String icebergCatalogName = CatalogNameAdaptor.getCatalogName("lakehouse-iceberg"); Assertions.assertEquals(GravitinoIcebergCatalogSpark35.class.getName(), icebergCatalogName); + + String paimonCatalogName = CatalogNameAdaptor.getCatalogName("lakehouse-paimon"); + Assertions.assertEquals(GravitinoPaimonCatalogSpark35.class.getName(), paimonCatalogName); } }