Skip to content

Commit

Permalink
[#4722] feat(paimon-spark-connector): support schema and table DDL an…
Browse files Browse the repository at this point in the history
…d table DML for GravitinoPaimonCatalog in paimon spark connector (#5722)

### What changes were proposed in this pull request?
support schema and table DDL and table DML for GravitinoPaimonCatalog in
paimon spark connector.

### Why are the changes needed?
Fix: 
#4722
#4717

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?
new Its and UTs.

---------

Co-authored-by: caican <[email protected]>
  • Loading branch information
caican00 and caican authored Dec 16, 2024
1 parent 7a8b090 commit 8732175
Show file tree
Hide file tree
Showing 35 changed files with 1,081 additions and 64 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.gravitino.catalog.lakehouse.paimon;

public class PaimonConstants {

// Paimon catalog properties constants
public static final String CATALOG_BACKEND = "catalog-backend";
public static final String METASTORE = "metastore";
public static final String URI = "uri";
public static final String WAREHOUSE = "warehouse";
public static final String CATALOG_BACKEND_NAME = "catalog-backend-name";

public static final String GRAVITINO_JDBC_USER = "jdbc-user";
public static final String PAIMON_JDBC_USER = "jdbc.user";

public static final String GRAVITINO_JDBC_PASSWORD = "jdbc-password";
public static final String PAIMON_JDBC_PASSWORD = "jdbc.password";

public static final String GRAVITINO_JDBC_DRIVER = "jdbc-driver";

// S3 properties needed by Paimon
public static final String S3_ENDPOINT = "s3.endpoint";
public static final String S3_ACCESS_KEY = "s3.access-key";
public static final String S3_SECRET_KEY = "s3.secret-key";

// OSS related properties
public static final String OSS_ENDPOINT = "fs.oss.endpoint";
public static final String OSS_ACCESS_KEY = "fs.oss.accessKeyId";
public static final String OSS_SECRET_KEY = "fs.oss.accessKeySecret";

// Iceberg Table properties constants
public static final String COMMENT = "comment";
public static final String OWNER = "owner";
public static final String BUCKET_KEY = "bucket-key";
public static final String MERGE_ENGINE = "merge-engine";
public static final String SEQUENCE_FIELD = "sequence.field";
public static final String ROWKIND_FIELD = "rowkind.field";
public static final String PRIMARY_KEY = "primary-key";
public static final String PARTITION = "partition";
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.gravitino.catalog.lakehouse.paimon;

import java.util.Collections;
import java.util.HashMap;
import java.util.Locale;
import java.util.Map;
import java.util.Optional;
import org.apache.gravitino.storage.OSSProperties;
import org.apache.gravitino.storage.S3Properties;

public class PaimonPropertiesUtils {

// Map that maintains the mapping of keys in Gravitino to that in Paimon, for example, users
// will only need to set the configuration 'catalog-backend' in Gravitino and Gravitino will
// change it to `catalogType` automatically and pass it to Paimon.
public static final Map<String, String> GRAVITINO_CONFIG_TO_PAIMON;

static {
Map<String, String> map = new HashMap();
map.put(PaimonConstants.CATALOG_BACKEND, PaimonConstants.CATALOG_BACKEND);
map.put(PaimonConstants.GRAVITINO_JDBC_DRIVER, PaimonConstants.GRAVITINO_JDBC_DRIVER);
map.put(PaimonConstants.GRAVITINO_JDBC_USER, PaimonConstants.PAIMON_JDBC_USER);
map.put(PaimonConstants.GRAVITINO_JDBC_PASSWORD, PaimonConstants.PAIMON_JDBC_PASSWORD);
map.put(PaimonConstants.URI, PaimonConstants.URI);
map.put(PaimonConstants.WAREHOUSE, PaimonConstants.WAREHOUSE);
map.put(PaimonConstants.CATALOG_BACKEND_NAME, PaimonConstants.CATALOG_BACKEND_NAME);
// S3
map.put(S3Properties.GRAVITINO_S3_ENDPOINT, PaimonConstants.S3_ENDPOINT);
map.put(S3Properties.GRAVITINO_S3_ACCESS_KEY_ID, PaimonConstants.S3_ACCESS_KEY);
map.put(S3Properties.GRAVITINO_S3_SECRET_ACCESS_KEY, PaimonConstants.S3_SECRET_KEY);
// OSS
map.put(OSSProperties.GRAVITINO_OSS_ENDPOINT, PaimonConstants.OSS_ENDPOINT);
map.put(OSSProperties.GRAVITINO_OSS_ACCESS_KEY_ID, PaimonConstants.OSS_ACCESS_KEY);
map.put(OSSProperties.GRAVITINO_OSS_ACCESS_KEY_SECRET, PaimonConstants.OSS_SECRET_KEY);
GRAVITINO_CONFIG_TO_PAIMON = Collections.unmodifiableMap(map);
}

/**
* Converts Gravitino properties to Paimon catalog properties, the common transform logic shared
* by Spark connector, Gravitino Paimon catalog.
*
* @param gravitinoProperties a map of Gravitino configuration properties.
* @return a map containing Paimon catalog properties.
*/
public static Map<String, String> toPaimonCatalogProperties(
Map<String, String> gravitinoProperties) {
Map<String, String> paimonProperties = new HashMap<>();
gravitinoProperties.forEach(
(key, value) -> {
if (GRAVITINO_CONFIG_TO_PAIMON.containsKey(key)) {
paimonProperties.put(GRAVITINO_CONFIG_TO_PAIMON.get(key), value);
}
});
return paimonProperties;
}

/**
* Get catalog backend name from Gravitino catalog properties.
*
* @param catalogProperties a map of Gravitino catalog properties.
* @return catalog backend name.
*/
public static String getCatalogBackendName(Map<String, String> catalogProperties) {
String backendName = catalogProperties.get(PaimonConstants.CATALOG_BACKEND_NAME);
if (backendName != null) {
return backendName;
}

String catalogBackend = catalogProperties.get(PaimonConstants.CATALOG_BACKEND);
return Optional.ofNullable(catalogBackend)
.map(s -> s.toLowerCase(Locale.ROOT))
.orElseThrow(
() ->
new UnsupportedOperationException(
String.format("Unsupported catalog backend: %s", catalogBackend)));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,20 +45,22 @@
*/
public class PaimonCatalogPropertiesMetadata extends BaseCatalogPropertiesMetadata {

@VisibleForTesting public static final String GRAVITINO_CATALOG_BACKEND = "catalog-backend";
public static final String PAIMON_METASTORE = "metastore";
public static final String WAREHOUSE = "warehouse";
public static final String URI = "uri";
public static final String GRAVITINO_JDBC_USER = "jdbc-user";
public static final String PAIMON_JDBC_USER = "jdbc.user";
public static final String GRAVITINO_JDBC_PASSWORD = "jdbc-password";
public static final String PAIMON_JDBC_PASSWORD = "jdbc.password";
public static final String GRAVITINO_JDBC_DRIVER = "jdbc-driver";
@VisibleForTesting
public static final String GRAVITINO_CATALOG_BACKEND = PaimonConstants.CATALOG_BACKEND;

public static final String PAIMON_METASTORE = PaimonConstants.METASTORE;
public static final String WAREHOUSE = PaimonConstants.WAREHOUSE;
public static final String URI = PaimonConstants.URI;
public static final String GRAVITINO_JDBC_USER = PaimonConstants.GRAVITINO_JDBC_USER;
public static final String PAIMON_JDBC_USER = PaimonConstants.PAIMON_JDBC_USER;
public static final String GRAVITINO_JDBC_PASSWORD = PaimonConstants.GRAVITINO_JDBC_PASSWORD;
public static final String PAIMON_JDBC_PASSWORD = PaimonConstants.PAIMON_JDBC_PASSWORD;
public static final String GRAVITINO_JDBC_DRIVER = PaimonConstants.GRAVITINO_JDBC_DRIVER;

// S3 properties needed by Paimon
public static final String S3_ENDPOINT = "s3.endpoint";
public static final String S3_ACCESS_KEY = "s3.access-key";
public static final String S3_SECRET_KEY = "s3.secret-key";
public static final String S3_ENDPOINT = PaimonConstants.S3_ENDPOINT;
public static final String S3_ACCESS_KEY = PaimonConstants.S3_ACCESS_KEY;
public static final String S3_SECRET_KEY = PaimonConstants.S3_SECRET_KEY;

public static final Map<String, String> GRAVITINO_CONFIG_TO_PAIMON =
ImmutableMap.of(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
*/
public class PaimonSchemaPropertiesMetadata extends BasePropertiesMetadata {

public static final String COMMENT = "comment";
public static final String COMMENT = PaimonConstants.COMMENT;

private static final Map<String, PropertyEntry<?>> PROPERTIES_METADATA;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,14 +35,14 @@
*/
public class PaimonTablePropertiesMetadata extends BasePropertiesMetadata {

public static final String COMMENT = "comment";
public static final String OWNER = "owner";
public static final String BUCKET_KEY = "bucket-key";
public static final String MERGE_ENGINE = "merge-engine";
public static final String SEQUENCE_FIELD = "sequence.field";
public static final String ROWKIND_FIELD = "rowkind.field";
public static final String PRIMARY_KEY = "primary-key";
public static final String PARTITION = "partition";
public static final String COMMENT = PaimonConstants.COMMENT;
public static final String OWNER = PaimonConstants.OWNER;
public static final String BUCKET_KEY = PaimonConstants.BUCKET_KEY;
public static final String MERGE_ENGINE = PaimonConstants.MERGE_ENGINE;
public static final String SEQUENCE_FIELD = PaimonConstants.SEQUENCE_FIELD;
public static final String ROWKIND_FIELD = PaimonConstants.ROWKIND_FIELD;
public static final String PRIMARY_KEY = PaimonConstants.PRIMARY_KEY;
public static final String PARTITION = PaimonConstants.PARTITION;

private static final Map<String, PropertyEntry<?>> PROPERTIES_METADATA;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,16 +22,17 @@
import java.util.Map;
import org.apache.commons.lang3.StringUtils;
import org.apache.gravitino.Config;
import org.apache.gravitino.catalog.lakehouse.paimon.PaimonConstants;
import org.apache.gravitino.config.ConfigBuilder;
import org.apache.gravitino.config.ConfigConstants;
import org.apache.gravitino.config.ConfigEntry;
import org.apache.gravitino.connector.PropertyEntry;

public class PaimonOSSFileSystemConfig extends Config {
// OSS related properties
public static final String OSS_ENDPOINT = "fs.oss.endpoint";
public static final String OSS_ACCESS_KEY = "fs.oss.accessKeyId";
public static final String OSS_SECRET_KEY = "fs.oss.accessKeySecret";
public static final String OSS_ENDPOINT = PaimonConstants.OSS_ENDPOINT;
public static final String OSS_ACCESS_KEY = PaimonConstants.OSS_ACCESS_KEY;
public static final String OSS_SECRET_KEY = PaimonConstants.OSS_SECRET_KEY;

public PaimonOSSFileSystemConfig(Map<String, String> properties) {
super(false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,16 +22,17 @@
import java.util.Map;
import org.apache.commons.lang3.StringUtils;
import org.apache.gravitino.Config;
import org.apache.gravitino.catalog.lakehouse.paimon.PaimonConstants;
import org.apache.gravitino.config.ConfigBuilder;
import org.apache.gravitino.config.ConfigConstants;
import org.apache.gravitino.config.ConfigEntry;
import org.apache.gravitino.connector.PropertyEntry;

public class PaimonS3FileSystemConfig extends Config {
// S3 related properties
public static final String S3_ENDPOINT = "s3.endpoint";
public static final String S3_ACCESS_KEY = "s3.access-key";
public static final String S3_SECRET_KEY = "s3.secret-key";
public static final String S3_ENDPOINT = PaimonConstants.S3_ENDPOINT;
public static final String S3_ACCESS_KEY = PaimonConstants.S3_ACCESS_KEY;
public static final String S3_SECRET_KEY = PaimonConstants.S3_SECRET_KEY;

public PaimonS3FileSystemConfig(Map<String, String> properties) {
super(false);
Expand Down
Loading

0 comments on commit 8732175

Please sign in to comment.