Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[#5220] improvment(hadoop-catalog): Optimize the name properties keys for Hadoop catalog. #5372

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions bundles/aliyun-bundle/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@ dependencies {
// org.apache.hadoop.fs.aliyun.oss.AliyunOSSFileSystem.initialize(AliyunOSSFileSystem.java:323)
// org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:3611)
implementation(libs.commons.lang)
implementation(project(":catalogs:catalog-common")) {
exclude("*")
}
}

tasks.withType(ShadowJar::class.java) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,23 +18,49 @@
*/
package org.apache.gravitino.oss.fs;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableMap;
import java.io.IOException;
import java.util.Map;
import org.apache.gravitino.catalog.hadoop.fs.FileSystemProvider;
import org.apache.gravitino.catalog.hadoop.fs.FileSystemUtils;
import org.apache.gravitino.storage.OSSProperties;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.aliyun.oss.AliyunOSSFileSystem;
import org.apache.hadoop.fs.aliyun.oss.Constants;

public class OSSFileSystemProvider implements FileSystemProvider {

private static final String OSS_FILESYSTEM_IMPL = "fs.oss.impl";

// This map maintains the mapping relationship between the OSS properties in Gravitino and
// the Hadoop properties. Through this map, users can customize the OSS properties in Gravitino
// and map them to the corresponding Hadoop properties.
// For example, User can use oss-endpoint to set the endpoint of OSS 'fs.oss.endpoint' in
// Gravitino.
// GCS and S3 also have similar mapping relationship.

@VisibleForTesting
public static final Map<String, String> GRAVITINO_KEY_TO_OSS_HADOOP_KEY =
ImmutableMap.of(
OSSProperties.GRAVITINO_OSS_ENDPOINT, Constants.ENDPOINT_KEY,
OSSProperties.GRAVITINO_OSS_ACCESS_KEY_ID, Constants.ACCESS_KEY_ID,
OSSProperties.GRAVITINO_OSS_ACCESS_KEY_SECRET, Constants.ACCESS_KEY_SECRET);

@Override
public FileSystem getFileSystem(Path path, Map<String, String> config) throws IOException {
Configuration configuration = new Configuration();
config.forEach(
(k, v) -> {
configuration.set(k.replace("gravitino.bypass.", ""), v);
});

Map<String, String> hadoopConfMap =
FileSystemUtils.toHadoopConfigMap(config, GRAVITINO_KEY_TO_OSS_HADOOP_KEY);
// OSS do not use service loader to load the file system, so we need to set the impl class
if (!hadoopConfMap.containsKey(OSS_FILESYSTEM_IMPL)) {
hadoopConfMap.put(OSS_FILESYSTEM_IMPL, AliyunOSSFileSystem.class.getCanonicalName());
}

hadoopConfMap.forEach(configuration::set);
return AliyunOSSFileSystem.newInstance(path.toUri(), configuration);
}

Expand Down
3 changes: 3 additions & 0 deletions bundles/aws-bundle/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@ dependencies {
implementation(libs.aws.policy)
implementation(libs.aws.sts)
implementation(libs.hadoop3.aws)
implementation(project(":catalogs:catalog-common")) {
exclude("*")
}
}

tasks.withType(ShadowJar::class.java) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,23 +19,39 @@

package org.apache.gravitino.s3.fs;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableMap;
import java.io.IOException;
import java.util.Map;
import org.apache.gravitino.catalog.hadoop.fs.FileSystemProvider;
import org.apache.gravitino.catalog.hadoop.fs.FileSystemUtils;
import org.apache.gravitino.storage.S3Properties;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.s3a.Constants;
import org.apache.hadoop.fs.s3a.S3AFileSystem;

public class S3FileSystemProvider implements FileSystemProvider {

@VisibleForTesting
public static final Map<String, String> GRAVITINO_KEY_TO_S3_HADOOP_KEY =
ImmutableMap.of(
S3Properties.GRAVITINO_S3_ENDPOINT, Constants.ENDPOINT,
S3Properties.GRAVITINO_S3_ACCESS_KEY_ID, Constants.ACCESS_KEY,
S3Properties.GRAVITINO_S3_SECRET_ACCESS_KEY, Constants.SECRET_KEY);

@Override
public FileSystem getFileSystem(Path path, Map<String, String> config) throws IOException {
Configuration configuration = new Configuration();
config.forEach(
(k, v) -> {
configuration.set(k.replace("gravitino.bypass.", ""), v);
});
Map<String, String> hadoopConfMap =
FileSystemUtils.toHadoopConfigMap(config, GRAVITINO_KEY_TO_S3_HADOOP_KEY);

if (!hadoopConfMap.containsKey(Constants.AWS_CREDENTIALS_PROVIDER)) {
configuration.set(
Constants.AWS_CREDENTIALS_PROVIDER, Constants.ASSUMED_ROLE_CREDENTIALS_DEFAULT);
}
hadoopConfMap.forEach(configuration::set);
return S3AFileSystem.newInstance(path.toUri(), configuration);
}

Expand Down
3 changes: 3 additions & 0 deletions bundles/gcp-bundle/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,9 @@ dependencies {
// runtime used
implementation(libs.commons.logging)
implementation(libs.hadoop3.gcs)
implementation(project(":catalogs:catalog-common")) {
exclude("*")
}
implementation(libs.google.auth.http)
implementation(libs.google.auth.credentials)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,13 @@
package org.apache.gravitino.gcs.fs;

import com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableMap;
import java.io.IOException;
import java.util.Map;
import org.apache.gravitino.catalog.hadoop.fs.FileSystemProvider;
import org.apache.gravitino.catalog.hadoop.fs.FileSystemUtils;
import org.apache.gravitino.storage.GCSProperties;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
Expand All @@ -30,15 +34,18 @@

public class GCSFileSystemProvider implements FileSystemProvider {
private static final Logger LOGGER = LoggerFactory.getLogger(GCSFileSystemProvider.class);
private static final String GCS_SERVICE_ACCOUNT_JSON_FILE =
"fs.gs.auth.service.account.json.keyfile";

@VisibleForTesting
public static final Map<String, String> GRAVITINO_KEY_TO_GCS_HADOOP_KEY =
ImmutableMap.of(GCSProperties.GCS_SERVICE_ACCOUNT_JSON_PATH, GCS_SERVICE_ACCOUNT_JSON_FILE);

@Override
public FileSystem getFileSystem(Path path, Map<String, String> config) throws IOException {
Configuration configuration = new Configuration();
config.forEach(
(k, v) -> {
configuration.set(k.replace("gravitino.bypass.", ""), v);
});

FileSystemUtils.toHadoopConfigMap(config, GRAVITINO_KEY_TO_GCS_HADOOP_KEY)
.forEach(configuration::set);
LOGGER.info("Creating GCS file system with config: {}", config);
return GoogleHadoopFileSystem.newInstance(path.toUri(), configuration);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.gravitino.storage;

public class GCSProperties {

// The path of service account JSON file of Google Cloud Storage.
public static final String GCS_SERVICE_ACCOUNT_JSON_PATH = "gcs-service-account-file";

private GCSProperties() {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ public class OSSProperties {
// The static access key ID used to access OSS data.
public static final String GRAVITINO_OSS_ACCESS_KEY_ID = "oss-access-key-id";
// The static access key secret used to access OSS data.
public static final String GRAVITINO_OSS_ACCESS_KEY_SECRET = "oss-access-key-secret";
public static final String GRAVITINO_OSS_ACCESS_KEY_SECRET = "oss-secret-access-key";

private OSSProperties() {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,5 +35,8 @@ public class S3Properties {
// S3 external id
public static final String GRAVITINO_S3_EXTERNAL_ID = "s3-external-id";

// The S3 credentials provider class name.
public static final String GRAVITINO_S3_CREDS_PROVIDER = "s3-creds-provider";

private S3Properties() {}
}
4 changes: 4 additions & 0 deletions catalogs/catalog-hadoop/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,10 @@ dependencies {
exclude(group = "*")
}

implementation(project(":catalogs:catalog-common")) {
exclude(group = "*")
}

compileOnly(libs.guava)

implementation(libs.hadoop3.common) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,20 @@
*/
public interface FileSystemProvider {

/**
* The prefix of the configuration key that should be bypassed when setting the configuration to
* the FileSystem instance.
*
* <p>For example, if the configuration key passed to {@link
* FileSystemProvider#getFileSystem(Path, Map)} 'gravitino.bypass.fs.s3a.endpoint', the prefix
* 'gravitino.bypass.' should be removed when setting the configuration to the FileSystem
* instance.
*
* <p>User can use this prefix to pass the configuration item that has not been defined in
* Gravitino.
*/
String GRAVITINO_BYPASS = "gravitino.bypass.";

/**
* Get the FileSystem instance according to the configuration map and file path.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import static org.apache.gravitino.catalog.hadoop.HadoopCatalogPropertiesMetadata.BUILTIN_HDFS_FS_PROVIDER;
import static org.apache.gravitino.catalog.hadoop.HadoopCatalogPropertiesMetadata.BUILTIN_LOCAL_FS_PROVIDER;
import static org.apache.gravitino.catalog.hadoop.fs.FileSystemProvider.GRAVITINO_BYPASS;

import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
Expand Down Expand Up @@ -101,4 +102,62 @@ public static FileSystemProvider getFileSystemProviderByName(
"File system provider with name '%s' not found in the file system provider list.",
fileSystemProviderName)));
}

/**
* Convert the Gravitino configuration to Hadoop configuration.
*
* <p>Predefined keys have the highest priority. If the key does not exist in the predefined keys,
* it will be set to the configuration. Keys with prefixes 'gravitino.bypass' has the lowest
* priority.
*
* <p>Consider the following example:
*
* <pre>
* config:
* k1=v1
* gravitino.bypass.k1=v2
* custom-k1=v3
* predefinedKeys:
* custom-k1=k1
* then the result will be:
* k1=v3
* </pre>
*
* @param config Gravitino configuration
* @return Hadoop configuration Map
*/
public static Map<String, String> toHadoopConfigMap(
Map<String, String> config, Map<String, String> predefinedKeys) {
Map<String, String> result = Maps.newHashMap();

// First, add those keys that start with 'gravitino.bypass' to the result map as it has the
// lowest priority.
config.forEach(
(k, v) -> {
if (k.startsWith(GRAVITINO_BYPASS)) {
String key = k.replace(GRAVITINO_BYPASS, "");
result.put(key, v);
}
});

// Then add those keys that are not in the predefined keys and not start with 'gravitino.bypass'
// to the result map.
config.forEach(
(k, v) -> {
if (!predefinedKeys.containsKey(k) && !k.startsWith(GRAVITINO_BYPASS)) {
result.put(k, v);
}
});

// Last, add those keys that are in the predefined keys to the result map.
config.forEach(
(k, v) -> {
if (predefinedKeys.containsKey(k)) {
String key = predefinedKeys.get(k);
result.put(key, v);
}
});

return result;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import com.google.common.collect.Maps;
import java.io.File;
import java.io.IOException;
import java.net.ConnectException;
import java.nio.file.Paths;
import java.util.Arrays;
import java.util.Map;
Expand Down Expand Up @@ -796,6 +797,39 @@ public void testTestConnection() {
ImmutableMap.of()));
}

@Test
void testTrailSlash() throws IOException {
try (SecureHadoopCatalogOperations ops = new SecureHadoopCatalogOperations(store)) {

String location = "hdfs://localhost:9000";
Map<String, String> catalogProperties = Maps.newHashMap();
catalogProperties.put(HadoopCatalogPropertiesMetadata.LOCATION, location);

ops.initialize(catalogProperties, randomCatalogInfo(), HADOOP_PROPERTIES_METADATA);

String schemaName = "schema1024";
NameIdentifier nameIdentifier = NameIdentifierUtil.ofSchema("m1", "c1", schemaName);

Map<String, String> schemaProperties = Maps.newHashMap();
schemaProperties.put(HadoopCatalogPropertiesMetadata.LOCATION, "hdfs://localhost:9000/user1");
StringIdentifier stringId = StringIdentifier.fromId(idGenerator.nextId());
schemaProperties =
Maps.newHashMap(StringIdentifier.newPropertiesWithId(stringId, schemaProperties));

Map<String, String> finalSchemaProperties = schemaProperties;

// If not fixed by #5296, this method will throw java.lang.IllegalArgumentException:
// java.net.URISyntaxException: Relative path in absolute URI: hdfs://localhost:9000schema1024
// After #5296, this method will throw java.lang.RuntimeException: Failed to create
// schema m1.c1.schema1024 location hdfs://localhost:9000/user1
Exception exception =
Assertions.assertThrows(
Exception.class,
() -> ops.createSchema(nameIdentifier, "comment", finalSchemaProperties));
Assertions.assertTrue(exception.getCause() instanceof ConnectException);
}
}

@Test
public void testGetFileLocation() throws IOException {
String schemaName = "schema1024";
Expand Down
Loading
Loading