Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[#5973] feat(hadoop-catalog): Support using dynamic credential when using fileset with cloud storage #5974

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions bundles/aliyun-bundle/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ dependencies {
implementation(project(":catalogs:catalog-common")) {
exclude("*")
}
implementation(project(":clients:client-java-runtime", configuration = "shadow"))
}

tasks.withType(ShadowJar::class.java) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.google.common.collect.ImmutableMap;
import java.io.IOException;
import java.util.Map;
import org.apache.gravitino.catalog.hadoop.common.Properties;
import org.apache.gravitino.catalog.hadoop.fs.FileSystemProvider;
import org.apache.gravitino.catalog.hadoop.fs.FileSystemUtils;
import org.apache.gravitino.storage.OSSProperties;
Expand Down Expand Up @@ -61,6 +62,13 @@ public FileSystem getFileSystem(Path path, Map<String, String> config) throws IO
}

hadoopConfMap.forEach(configuration::set);

if (config.containsKey(Properties.USE_GRAVITINO_CLOUD_STORE_CREDENTIAL)
&& Boolean.parseBoolean(config.get(Properties.USE_GRAVITINO_CLOUD_STORE_CREDENTIAL))) {
configuration.set(
"fs.oss.credentials.provider", OSSSessionCredentialProvider.class.getName());
}

return AliyunOSSFileSystem.newInstance(path.toUri(), configuration);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.gravitino.oss.fs;

import static org.apache.gravitino.credential.OSSTokenCredential.GRAVITINO_OSS_SESSION_ACCESS_KEY_ID;
import static org.apache.gravitino.credential.OSSTokenCredential.GRAVITINO_OSS_SESSION_SECRET_ACCESS_KEY;
import static org.apache.gravitino.credential.OSSTokenCredential.GRAVITINO_OSS_TOKEN;

import com.aliyun.oss.common.auth.BasicCredentials;
import com.aliyun.oss.common.auth.Credentials;
import com.aliyun.oss.common.auth.CredentialsProvider;
import java.net.URI;
import java.util.Map;
import org.apache.gravitino.NameIdentifier;
import org.apache.gravitino.client.GravitinoClient;
import org.apache.gravitino.credential.Credential;
import org.apache.gravitino.credential.S3TokenCredential;
import org.apache.gravitino.file.Fileset;
import org.apache.gravitino.file.FilesetCatalog;
import org.apache.hadoop.conf.Configuration;

public class OSSSessionCredentialProvider implements CredentialsProvider {

private BasicCredentials basicCredentials;
private String filesetIdentifier;
private long expirationTime;
private GravitinoClient client;

public OSSSessionCredentialProvider(URI uri, Configuration conf) {

// extra value and init Gravitino client here
this.filesetIdentifier = conf.get("gravitino.fileset.identifier");
String metalake = conf.get("fs.gravitino.client.metalake");
String gravitinoServer = conf.get("fs.gravitino.server.uri");

this.client =
GravitinoClient.builder(gravitinoServer).withMetalake(metalake).withSimpleAuth().build();
}

@Override
public void setCredentials(Credentials credentials) {}

@Override
public Credentials getCredentials() {
// If the credentials are null or about to expire, refresh the credentials.
if (basicCredentials == null || System.currentTimeMillis() > expirationTime - 5 * 60 * 1000) {
synchronized (this) {
refresh();
}
}

return basicCredentials;
}

private void refresh() {
// Refresh the credentials
String[] idents = filesetIdentifier.split("\\.");
String catalog = idents[1];

FilesetCatalog filesetCatalog = client.loadCatalog(catalog).asFilesetCatalog();

@SuppressWarnings("unused")
Fileset fileset = filesetCatalog.loadFileset(NameIdentifier.of(idents[2], idents[3]));
// Should mock
// Credential credentials = fileset.supportsCredentials().getCredential("s3-token");

Credential credentials =
new S3TokenCredential("AS", "NF", "FwoGZXIvYXdzEDMaDBf3ltl7HG6K7Ne7QS", 1735033800000L);

Map<String, String> credentialMap = credentials.credentialInfo();
String accessKeyId = credentialMap.get(GRAVITINO_OSS_SESSION_ACCESS_KEY_ID);
String secretAccessKey = credentialMap.get(GRAVITINO_OSS_SESSION_SECRET_ACCESS_KEY);
String sessionToken = credentialMap.get(GRAVITINO_OSS_TOKEN);

this.basicCredentials = new BasicCredentials(accessKeyId, secretAccessKey, sessionToken);
this.expirationTime = credentials.expireTimeInMs();
}
}
1 change: 1 addition & 0 deletions bundles/aws-bundle/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ dependencies {
implementation(project(":catalogs:catalog-common")) {
exclude("*")
}
implementation(project(":clients:client-java-runtime", configuration = "shadow"))
}

tasks.withType(ShadowJar::class.java) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.google.common.collect.ImmutableMap;
import java.io.IOException;
import java.util.Map;
import org.apache.gravitino.catalog.hadoop.common.Properties;
import org.apache.gravitino.catalog.hadoop.fs.FileSystemProvider;
import org.apache.gravitino.catalog.hadoop.fs.FileSystemUtils;
import org.apache.gravitino.storage.S3Properties;
Expand All @@ -48,10 +49,17 @@ public FileSystem getFileSystem(Path path, Map<String, String> config) throws IO
FileSystemUtils.toHadoopConfigMap(config, GRAVITINO_KEY_TO_S3_HADOOP_KEY);

if (!hadoopConfMap.containsKey(Constants.AWS_CREDENTIALS_PROVIDER)) {
configuration.set(
hadoopConfMap.put(
Constants.AWS_CREDENTIALS_PROVIDER, Constants.ASSUMED_ROLE_CREDENTIALS_DEFAULT);
}
hadoopConfMap.forEach(configuration::set);

if (config.containsKey(Properties.USE_GRAVITINO_CLOUD_STORE_CREDENTIAL)
&& Boolean.parseBoolean(config.get(Properties.USE_GRAVITINO_CLOUD_STORE_CREDENTIAL))) {
configuration.set(
"fs.s3a.aws.credentials.provider", S3SessionCredentialProvider.class.getName());
}

return S3AFileSystem.newInstance(path.toUri(), configuration);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.gravitino.s3.fs;

import static org.apache.gravitino.credential.S3TokenCredential.GRAVITINO_S3_SESSION_ACCESS_KEY_ID;
import static org.apache.gravitino.credential.S3TokenCredential.GRAVITINO_S3_SESSION_SECRET_ACCESS_KEY;
import static org.apache.gravitino.credential.S3TokenCredential.GRAVITINO_S3_TOKEN;

import com.amazonaws.auth.AWSCredentials;
import com.amazonaws.auth.AWSCredentialsProvider;
import com.amazonaws.auth.BasicSessionCredentials;
import java.net.URI;
import java.util.Map;
import org.apache.gravitino.NameIdentifier;
import org.apache.gravitino.client.GravitinoClient;
import org.apache.gravitino.credential.Credential;
import org.apache.gravitino.credential.S3TokenCredential;
import org.apache.gravitino.file.Fileset;
import org.apache.gravitino.file.FilesetCatalog;
import org.apache.hadoop.conf.Configuration;

public class S3SessionCredentialProvider implements AWSCredentialsProvider {

private final GravitinoClient client;
private final String filesetIdentifier;

private BasicSessionCredentials basicSessionCredentials;
private long expirationTime;

public S3SessionCredentialProvider(final URI uri, final Configuration conf) {
// extra value and init Gravitino client here
this.filesetIdentifier = conf.get("gravitino.fileset.identifier");
String metalake = conf.get("fs.gravitino.client.metalake");
String gravitinoServer = conf.get("fs.gravitino.server.uri");

// TODO, support auth between client and server.
this.client =
GravitinoClient.builder(gravitinoServer).withMetalake(metalake).withSimpleAuth().build();
}

@Override
public AWSCredentials getCredentials() {

// Refresh credentials if they are null or about to expire in 5 minutes
if (basicSessionCredentials == null
|| System.currentTimeMillis() > expirationTime - 5 * 60 * 1000) {
synchronized (this) {
refresh();
}
}

return basicSessionCredentials;
}

@Override
public void refresh() {
// The format of filesetIdentifier is "metalake.catalog.fileset.schema"
String[] idents = filesetIdentifier.split("\\.");
String catalog = idents[1];

FilesetCatalog filesetCatalog = client.loadCatalog(catalog).asFilesetCatalog();

@SuppressWarnings("unused")
Fileset fileset = filesetCatalog.loadFileset(NameIdentifier.of(idents[2], idents[3]));
// Should mock
// Credential credentials = fileset.supportsCredentials().getCredential("s3-token");

Credential credentials = new S3TokenCredential("ASIAZ6", "NFzd", "xx", 1735033800000L);

Map<String, String> credentialMap = credentials.credentialInfo();
String accessKeyId = credentialMap.get(GRAVITINO_S3_SESSION_ACCESS_KEY_ID);
String secretAccessKey = credentialMap.get(GRAVITINO_S3_SESSION_SECRET_ACCESS_KEY);
String sessionToken = credentialMap.get(GRAVITINO_S3_TOKEN);

this.basicSessionCredentials =
new BasicSessionCredentials(accessKeyId, secretAccessKey, sessionToken);
this.expirationTime = credentials.expireTimeInMs();
}
}
1 change: 1 addition & 0 deletions bundles/azure-bundle/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ dependencies {
implementation(project(":catalogs:catalog-common")) {
exclude("*")
}
implementation(project(":clients:client-java-runtime", configuration = "shadow"))
}

tasks.withType(ShadowJar::class.java) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,22 @@

package org.apache.gravitino.abs.fs;

import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_ACCOUNT_AUTH_TYPE_PROPERTY_NAME;
import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_SAS_TOKEN_PROVIDER_TYPE;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableMap;
import java.io.IOException;
import java.util.Map;
import javax.annotation.Nonnull;
import org.apache.gravitino.catalog.hadoop.common.Properties;
import org.apache.gravitino.catalog.hadoop.fs.FileSystemProvider;
import org.apache.gravitino.catalog.hadoop.fs.FileSystemUtils;
import org.apache.gravitino.storage.AzureProperties;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.azurebfs.services.AuthType;

public class AzureFileSystemProvider implements FileSystemProvider {

Expand Down Expand Up @@ -62,6 +67,17 @@ public FileSystem getFileSystem(@Nonnull Path path, @Nonnull Map<String, String>
configuration.set(ABFS_IMPL_KEY, ABFS_IMPL);
}

if (config.containsKey(Properties.USE_GRAVITINO_CLOUD_STORE_CREDENTIAL)
&& Boolean.parseBoolean(config.get(Properties.USE_GRAVITINO_CLOUD_STORE_CREDENTIAL))) {
String pathString = path.toString();
String accountSuffix = pathString.split("@")[1].split("/")[0];

configuration.set(FS_AZURE_ACCOUNT_AUTH_TYPE_PROPERTY_NAME, AuthType.SAS.name());
configuration.set(
FS_AZURE_SAS_TOKEN_PROVIDER_TYPE + "." + accountSuffix,
AzureSasCredentialProvider.class.getName());
}

hadoopConfMap.forEach(configuration::set);

return FileSystem.get(path.toUri(), configuration);
Expand Down
Loading
Loading