Skip to content

Commit

Permalink
[#4938]feat(lakehouse-paimon): Support S3 filesystem for Paimon catal…
Browse files Browse the repository at this point in the history
…og. (#4939)

### What changes were proposed in this pull request?

Add support for Paimon S3 filesystem.

Note: related documents will be added in another PR.

### Why are the changes needed?

for better user experience.

Fix: #4938

### Does this PR introduce _any_ user-facing change?

N/A

### How was this patch tested?

Test locally and IT
  • Loading branch information
yuqi1129 authored Oct 10, 2024
1 parent 4ff652a commit f753afa
Show file tree
Hide file tree
Showing 12 changed files with 301 additions and 21 deletions.
3 changes: 2 additions & 1 deletion catalogs/catalog-common/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,9 @@ plugins {

// try to avoid adding extra dependencies because it is used by catalogs and connectors.
dependencies {
implementation(libs.slf4j.api)
implementation(libs.commons.lang3)
implementation(libs.guava)
implementation(libs.slf4j.api)

testImplementation(libs.junit.jupiter.api)
testImplementation(libs.junit.jupiter.params)
Expand Down
6 changes: 6 additions & 0 deletions catalogs/catalog-lakehouse-paimon/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@ dependencies {
implementation(project(":api")) {
exclude("*")
}
implementation(project(":catalogs:catalog-common")) {
exclude("*")
}
implementation(project(":common")) {
exclude("*")
}
Expand Down Expand Up @@ -121,7 +124,10 @@ dependencies {
testImplementation(libs.postgresql.driver)
testImplementation(libs.bundles.log4j)
testImplementation(libs.junit.jupiter.params)
testImplementation(libs.paimon.s3)
testImplementation(libs.paimon.spark)
testImplementation(libs.testcontainers)
testImplementation(libs.testcontainers.localstack)

testRuntimeOnly(libs.junit.jupiter.engine)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,11 @@
import java.util.Map;
import org.apache.gravitino.catalog.lakehouse.paimon.authentication.AuthenticationConfig;
import org.apache.gravitino.catalog.lakehouse.paimon.authentication.kerberos.KerberosConfig;
import org.apache.gravitino.catalog.lakehouse.paimon.storage.S3StorageConfig;
import org.apache.gravitino.connector.BaseCatalogPropertiesMetadata;
import org.apache.gravitino.connector.PropertiesMetadata;
import org.apache.gravitino.connector.PropertyEntry;
import org.apache.gravitino.storage.S3Properties;

/**
* Implementation of {@link PropertiesMetadata} that represents Paimon catalog properties metadata.
Expand All @@ -45,6 +47,11 @@ public class PaimonCatalogPropertiesMetadata extends BaseCatalogPropertiesMetada
public static final String WAREHOUSE = "warehouse";
public static final String URI = "uri";

// S3 properties needed by Paimon
public static final String S3_ENDPOINT = "s3.endpoint";
public static final String S3_ACCESS_KEY = "s3.access-key";
public static final String S3_SECRET_KEY = "s3.secret-key";

public static final Map<String, String> GRAVITINO_CONFIG_TO_PAIMON =
ImmutableMap.of(GRAVITINO_CATALOG_BACKEND, PAIMON_METASTORE, WAREHOUSE, WAREHOUSE, URI, URI);
private static final Map<String, PropertyEntry<?>> PROPERTIES_METADATA;
Expand All @@ -61,6 +68,12 @@ public class PaimonCatalogPropertiesMetadata extends BaseCatalogPropertiesMetada
AuthenticationConfig.AUTH_TYPE_KEY,
AuthenticationConfig.AUTH_TYPE_KEY);

public static final Map<String, String> S3_CONFIGURATION =
ImmutableMap.of(
S3Properties.GRAVITINO_S3_ACCESS_KEY_ID, S3_ACCESS_KEY,
S3Properties.GRAVITINO_S3_SECRET_ACCESS_KEY, S3_SECRET_KEY,
S3Properties.GRAVITINO_S3_ENDPOINT, S3_ENDPOINT);

static {
List<PropertyEntry<?>> propertyEntries =
ImmutableList.of(
Expand Down Expand Up @@ -88,6 +101,8 @@ public class PaimonCatalogPropertiesMetadata extends BaseCatalogPropertiesMetada
result.putAll(Maps.uniqueIndex(propertyEntries, PropertyEntry::getName));
result.putAll(KerberosConfig.KERBEROS_PROPERTY_ENTRIES);
result.putAll(AuthenticationConfig.AUTHENTICATION_PROPERTY_ENTRIES);
result.putAll(S3StorageConfig.S3_FILESYSTEM_PROPERTY_ENTRIES);

PROPERTIES_METADATA = ImmutableMap.copyOf(result);
}

Expand All @@ -107,6 +122,10 @@ protected Map<String, String> transformProperties(Map<String, String> properties
if (KERBEROS_CONFIGURATION.containsKey(key)) {
gravitinoConfig.put(KERBEROS_CONFIGURATION.get(key), value);
}

if (S3_CONFIGURATION.containsKey(key)) {
gravitinoConfig.put(S3_CONFIGURATION.get(key), value);
}
});
return gravitinoConfig;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.gravitino.catalog.lakehouse.paimon.storage;

import static org.apache.gravitino.storage.S3Properties.GRAVITINO_S3_ACCESS_KEY_ID;
import static org.apache.gravitino.storage.S3Properties.GRAVITINO_S3_ENDPOINT;
import static org.apache.gravitino.storage.S3Properties.GRAVITINO_S3_SECRET_ACCESS_KEY;

import com.google.common.collect.ImmutableMap;
import java.util.Map;
import org.apache.commons.lang3.StringUtils;
import org.apache.gravitino.Config;
import org.apache.gravitino.config.ConfigBuilder;
import org.apache.gravitino.config.ConfigConstants;
import org.apache.gravitino.config.ConfigEntry;
import org.apache.gravitino.connector.PropertyEntry;

public class S3StorageConfig extends Config {

public S3StorageConfig(Map<String, String> properties) {
super(false);
loadFromMap(properties, k -> true);
}

// Unified S3
public static final ConfigEntry<String> PAIMON_S3_ENDPOINT_ENTRY =
new ConfigBuilder(GRAVITINO_S3_ENDPOINT)
.doc("The endpoint of the AWS s3")
.version(ConfigConstants.VERSION_0_7_0)
.stringConf()
.checkValue(StringUtils::isNotBlank, ConfigConstants.NOT_BLANK_ERROR_MSG)
.create();

public static final ConfigEntry<String> PAIMON_S3_ACCESS_KEY_ENTRY =
new ConfigBuilder(GRAVITINO_S3_ACCESS_KEY_ID)
.doc("The access key of the AWS s3")
.version(ConfigConstants.VERSION_0_7_0)
.stringConf()
.checkValue(StringUtils::isNotBlank, ConfigConstants.NOT_BLANK_ERROR_MSG)
.create();

public static final ConfigEntry<String> PAIMON_S3_SECRET_KEY_ENTRY =
new ConfigBuilder(GRAVITINO_S3_SECRET_ACCESS_KEY)
.doc("The secret key of the AWS s3")
.version(ConfigConstants.VERSION_0_7_0)
.stringConf()
.checkValue(StringUtils::isNotBlank, ConfigConstants.NOT_BLANK_ERROR_MSG)
.create();

public String getS3Endpoint() {
return get(PAIMON_S3_ENDPOINT_ENTRY);
}

public String getS3AccessKey() {
return get(PAIMON_S3_ACCESS_KEY_ENTRY);
}

public String getS3SecretKey() {
return get(PAIMON_S3_SECRET_KEY_ENTRY);
}

public static final Map<String, PropertyEntry<?>> S3_FILESYSTEM_PROPERTY_ENTRIES =
new ImmutableMap.Builder<String, PropertyEntry<?>>()
.put(
GRAVITINO_S3_ENDPOINT,
PropertyEntry.stringOptionalPropertyEntry(
GRAVITINO_S3_ENDPOINT,
"The endpoint of the AWS s3",
false /* immutable */,
null /* defaultValue */,
false /* hidden */))
.put(
GRAVITINO_S3_ACCESS_KEY_ID,
PropertyEntry.stringOptionalPropertyEntry(
GRAVITINO_S3_ACCESS_KEY_ID,
"The access key of the AWS s3",
false /* immutable */,
null /* defaultValue */,
false /* hidden */))
.put(
GRAVITINO_S3_SECRET_ACCESS_KEY,
PropertyEntry.stringOptionalPropertyEntry(
GRAVITINO_S3_SECRET_ACCESS_KEY,
"The secret key of the AWS s3",
false /* immutable */,
null /* defaultValue */,
false /* hidden */))
.build();
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.gravitino.catalog.lakehouse.paimon.utils;

import static org.apache.gravitino.catalog.lakehouse.paimon.PaimonCatalogPropertiesMetadata.S3_CONFIGURATION;
import static org.apache.gravitino.catalog.lakehouse.paimon.PaimonConfig.CATALOG_BACKEND;
import static org.apache.gravitino.catalog.lakehouse.paimon.PaimonConfig.CATALOG_URI;
import static org.apache.gravitino.catalog.lakehouse.paimon.PaimonConfig.CATALOG_WAREHOUSE;
Expand All @@ -26,6 +27,7 @@

import com.google.common.base.Preconditions;
import java.io.File;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
import org.apache.commons.lang3.StringUtils;
Expand Down Expand Up @@ -120,4 +122,12 @@ private static void checkPaimonConfig(PaimonConfig paimonConfig) {
StringUtils.isNotBlank(uri), "Paimon Catalog uri can not be null or empty.");
}
}

public static Map<String, String> toPaimonCatalogProperties(
Map<String, String> gravitinoProperties) {
Map<String, String> paimonProperties = new HashMap<>();
gravitinoProperties.forEach(
(key, value) -> paimonProperties.put(S3_CONFIGURATION.getOrDefault(key, key), value));
return paimonProperties;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -109,19 +109,23 @@ public abstract class CatalogPaimonBaseIT extends AbstractIT {
private GravitinoMetalake metalake;
private Catalog catalog;
private org.apache.paimon.catalog.Catalog paimonCatalog;
private SparkSession spark;
protected SparkSession spark;
private Map<String, String> catalogProperties;

@BeforeAll
public void startup() {
containerSuite.startHiveContainer();
startNecessaryContainers();
catalogProperties = initPaimonCatalogProperties();
createMetalake();
createCatalog();
createSchema();
initSparkEnv();
}

protected void startNecessaryContainers() {
containerSuite.startHiveContainer();
}

@AfterAll
public void stop() {
clearTableAndSchema();
Expand Down Expand Up @@ -882,8 +886,13 @@ private void createCatalog() {
Preconditions.checkArgument(
StringUtils.isNotBlank(type), "Paimon Catalog backend type can not be null or empty.");
catalogProperties.put(PaimonCatalogPropertiesMetadata.PAIMON_METASTORE, type);

// Why needs this conversion? Because PaimonCatalogOperations#initialize will try to convert
// Gravitino general S3 properties to Paimon specific S3 properties.
Map<String, String> copy = CatalogUtils.toPaimonCatalogProperties(catalogProperties);

PaimonBackendCatalogWrapper paimonBackendCatalogWrapper =
CatalogUtils.loadCatalogBackend(new PaimonConfig(catalogProperties));
CatalogUtils.loadCatalogBackend(new PaimonConfig(copy));
paimonCatalog = paimonBackendCatalogWrapper.getCatalog();
}

Expand Down Expand Up @@ -926,7 +935,7 @@ private Map<String, String> createProperties() {
return properties;
}

private void initSparkEnv() {
protected void initSparkEnv() {
spark =
SparkSession.builder()
.master("local[1]")
Expand Down
Loading

0 comments on commit f753afa

Please sign in to comment.