Skip to content

Commit

Permalink
Fix again.
Browse files Browse the repository at this point in the history
  • Loading branch information
yuqi1129 committed Dec 26, 2024
1 parent d7a031c commit 5b648e8
Show file tree
Hide file tree
Showing 17 changed files with 561 additions and 122 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ public long expireTimeInMs() {
public Map<String, String> credentialInfo() {
return (new ImmutableMap.Builder<String, String>())
.put(GRAVITINO_ADLS_SAS_TOKEN, sasToken)
.put(GRAVITINO_AZURE_STORAGE_ACCOUNT_NAME, accountName)
.build();
}

Expand Down
1 change: 1 addition & 0 deletions bundles/aliyun-bundle/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ dependencies {
exclude("*")
}
implementation(project(":clients:client-java-runtime", configuration = "shadow"))
implementation(project(":clients:filesystem-hadoop3-runtime", configuration = "shadow"))
}

tasks.withType(ShadowJar::class.java) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,9 @@
import com.google.common.collect.ImmutableMap;
import java.io.IOException;
import java.util.Map;
import org.apache.gravitino.catalog.hadoop.common.Properties;
import org.apache.gravitino.catalog.hadoop.fs.FileSystemProvider;
import org.apache.gravitino.catalog.hadoop.fs.FileSystemUtils;
import org.apache.gravitino.filesystem.hadoop.GravitinoVirtualFileSystemConfiguration;
import org.apache.gravitino.storage.OSSProperties;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
Expand Down Expand Up @@ -61,14 +61,16 @@ public FileSystem getFileSystem(Path path, Map<String, String> config) throws IO
hadoopConfMap.put(OSS_FILESYSTEM_IMPL, AliyunOSSFileSystem.class.getCanonicalName());
}

hadoopConfMap.forEach(configuration::set);

if (config.containsKey(Properties.USE_GRAVITINO_CLOUD_STORE_CREDENTIAL)
&& Boolean.parseBoolean(config.get(Properties.USE_GRAVITINO_CLOUD_STORE_CREDENTIAL))) {
configuration.set(
"fs.oss.credentials.provider", OSSSessionCredentialProvider.class.getName());
if (!hadoopConfMap.containsKey(Constants.CREDENTIALS_PROVIDER_KEY)
&& config.containsKey(
GravitinoVirtualFileSystemConfiguration.FS_GRAVITINO_SERVER_URI_KEY)) {
hadoopConfMap.put(
Constants.CREDENTIALS_PROVIDER_KEY,
OSSSessionCredentialProvider.class.getCanonicalName());
}

hadoopConfMap.forEach(configuration::set);

return AliyunOSSFileSystem.newInstance(path.toUri(), configuration);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,32 +26,35 @@
import com.aliyun.oss.common.auth.BasicCredentials;
import com.aliyun.oss.common.auth.Credentials;
import com.aliyun.oss.common.auth.CredentialsProvider;
import com.aliyun.oss.common.auth.DefaultCredentials;
import java.net.URI;
import java.util.Map;
import org.apache.gravitino.NameIdentifier;
import org.apache.gravitino.client.GravitinoClient;
import org.apache.gravitino.credential.Credential;
import org.apache.gravitino.credential.S3TokenCredential;
import org.apache.gravitino.credential.OSSTokenCredential;
import org.apache.gravitino.file.Fileset;
import org.apache.gravitino.file.FilesetCatalog;
import org.apache.gravitino.filesystem.hadoop.GravitinoVirtualFileSystem;
import org.apache.gravitino.filesystem.hadoop.GravitinoVirtualFileSystemConfiguration;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.aliyun.oss.Constants;

public class OSSSessionCredentialProvider implements CredentialsProvider {

private BasicCredentials basicCredentials;
private String filesetIdentifier;
private Credentials basicCredentials;
private final String filesetIdentifier;
private long expirationTime;
private GravitinoClient client;
private final GravitinoClient client;
private final Configuration configuration;

public OSSSessionCredentialProvider(URI uri, Configuration conf) {

this.filesetIdentifier =
conf.get(GravitinoVirtualFileSystemConfiguration.GVFS_FILESET_IDENTIFIER);
// extra value and init Gravitino client here
this.filesetIdentifier = conf.get("gravitino.fileset.identifier");
String metalake = conf.get("fs.gravitino.client.metalake");
String gravitinoServer = conf.get("fs.gravitino.server.uri");

this.client =
GravitinoClient.builder(gravitinoServer).withMetalake(metalake).withSimpleAuth().build();
GravitinoVirtualFileSystem gravitinoVirtualFileSystem = new GravitinoVirtualFileSystem();
this.client = gravitinoVirtualFileSystem.initializeClient(conf);
this.configuration = conf;
}

@Override
Expand All @@ -70,26 +73,42 @@ public Credentials getCredentials() {
}

private void refresh() {
// Refresh the credentials
String[] idents = filesetIdentifier.split("\\.");
String catalog = idents[1];

FilesetCatalog filesetCatalog = client.loadCatalog(catalog).asFilesetCatalog();

@SuppressWarnings("unused")
Fileset fileset = filesetCatalog.loadFileset(NameIdentifier.of(idents[2], idents[3]));
// Should mock
// Credential credentials = fileset.supportsCredentials().getCredential("s3-token");
// Use dynamic credential by default.

Credential[] credentials = fileset.supportsCredentials().getCredentials();
if (credentials.length == 0) {
expirationTime = Long.MAX_VALUE;
this.basicCredentials =
new DefaultCredentials(
configuration.get(Constants.ACCESS_KEY_ID),
configuration.get(Constants.ACCESS_KEY_SECRET));
return;
}

Credential credentials =
new S3TokenCredential("AS", "NF", "FwoGZXIvYXdzEDMaDBf3ltl7HG6K7Ne7QS", 1735033800000L);
// Use the first one.
Credential credential = credentials[0];
Map<String, String> credentialMap = credential.toProperties();

Map<String, String> credentialMap = credentials.credentialInfo();
String accessKeyId = credentialMap.get(GRAVITINO_OSS_SESSION_ACCESS_KEY_ID);
String secretAccessKey = credentialMap.get(GRAVITINO_OSS_SESSION_SECRET_ACCESS_KEY);
String sessionToken = credentialMap.get(GRAVITINO_OSS_TOKEN);

this.basicCredentials = new BasicCredentials(accessKeyId, secretAccessKey, sessionToken);
this.expirationTime = credentials.expireTimeInMs();
if (OSSTokenCredential.OSS_TOKEN_CREDENTIAL_TYPE.equals(
credentialMap.get(Credential.CREDENTIAL_TYPE))) {
String sessionToken = credentialMap.get(GRAVITINO_OSS_TOKEN);
this.basicCredentials = new BasicCredentials(accessKeyId, secretAccessKey, sessionToken);
} else {
this.basicCredentials = new DefaultCredentials(accessKeyId, secretAccessKey);
}

this.expirationTime = credential.expireTimeInMs();
if (expirationTime <= 0) {
expirationTime = Long.MAX_VALUE;
}
}
}
1 change: 1 addition & 0 deletions bundles/aws-bundle/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ dependencies {
exclude("*")
}
implementation(project(":clients:client-java-runtime", configuration = "shadow"))
implementation(project(":clients:filesystem-hadoop3-runtime", configuration = "shadow"))
}

tasks.withType(ShadowJar::class.java) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,9 @@
import com.google.common.collect.ImmutableMap;
import java.io.IOException;
import java.util.Map;
import org.apache.gravitino.catalog.hadoop.common.Properties;
import org.apache.gravitino.catalog.hadoop.fs.FileSystemProvider;
import org.apache.gravitino.catalog.hadoop.fs.FileSystemUtils;
import org.apache.gravitino.filesystem.hadoop.GravitinoVirtualFileSystemConfiguration;
import org.apache.gravitino.storage.S3Properties;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
Expand All @@ -48,18 +48,14 @@ public FileSystem getFileSystem(Path path, Map<String, String> config) throws IO
Map<String, String> hadoopConfMap =
FileSystemUtils.toHadoopConfigMap(config, GRAVITINO_KEY_TO_S3_HADOOP_KEY);

if (!hadoopConfMap.containsKey(Constants.AWS_CREDENTIALS_PROVIDER)) {
if (!hadoopConfMap.containsKey(Constants.AWS_CREDENTIALS_PROVIDER)
&& config.containsKey(
GravitinoVirtualFileSystemConfiguration.FS_GRAVITINO_SERVER_URI_KEY)) {
hadoopConfMap.put(
Constants.AWS_CREDENTIALS_PROVIDER, Constants.ASSUMED_ROLE_CREDENTIALS_DEFAULT);
}
hadoopConfMap.forEach(configuration::set);

if (config.containsKey(Properties.USE_GRAVITINO_CLOUD_STORE_CREDENTIAL)
&& Boolean.parseBoolean(config.get(Properties.USE_GRAVITINO_CLOUD_STORE_CREDENTIAL))) {
configuration.set(
"fs.s3a.aws.credentials.provider", S3SessionCredentialProvider.class.getName());
Constants.AWS_CREDENTIALS_PROVIDER, S3SessionCredentialProvider.class.getCanonicalName());
}

hadoopConfMap.forEach(configuration::set);
return S3AFileSystem.newInstance(path.toUri(), configuration);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@

import com.amazonaws.auth.AWSCredentials;
import com.amazonaws.auth.AWSCredentialsProvider;
import com.amazonaws.auth.BasicAWSCredentials;
import com.amazonaws.auth.BasicSessionCredentials;
import java.net.URI;
import java.util.Map;
Expand All @@ -34,30 +35,32 @@
import org.apache.gravitino.credential.S3TokenCredential;
import org.apache.gravitino.file.Fileset;
import org.apache.gravitino.file.FilesetCatalog;
import org.apache.gravitino.filesystem.hadoop.GravitinoVirtualFileSystem;
import org.apache.gravitino.filesystem.hadoop.GravitinoVirtualFileSystemConfiguration;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.s3a.Constants;

public class S3SessionCredentialProvider implements AWSCredentialsProvider {

private final GravitinoClient client;
private final String filesetIdentifier;
private final Configuration configuration;

private BasicSessionCredentials basicSessionCredentials;
private AWSCredentials basicSessionCredentials;
private long expirationTime;

public S3SessionCredentialProvider(final URI uri, final Configuration conf) {
// extra value and init Gravitino client here
this.filesetIdentifier = conf.get("gravitino.fileset.identifier");
String metalake = conf.get("fs.gravitino.client.metalake");
String gravitinoServer = conf.get("fs.gravitino.server.uri");
this.filesetIdentifier =
conf.get(GravitinoVirtualFileSystemConfiguration.GVFS_FILESET_IDENTIFIER);
this.configuration = conf;

// TODO, support auth between client and server.
this.client =
GravitinoClient.builder(gravitinoServer).withMetalake(metalake).withSimpleAuth().build();
// extra value and init Gravitino client here
GravitinoVirtualFileSystem gravitinoVirtualFileSystem = new GravitinoVirtualFileSystem();
this.client = gravitinoVirtualFileSystem.initializeClient(conf);
}

@Override
public AWSCredentials getCredentials() {

// Refresh credentials if they are null or about to expire in 5 minutes
if (basicSessionCredentials == null
|| System.currentTimeMillis() > expirationTime - 5 * 60 * 1000) {
Expand All @@ -77,20 +80,36 @@ public void refresh() {

FilesetCatalog filesetCatalog = client.loadCatalog(catalog).asFilesetCatalog();

@SuppressWarnings("unused")
Fileset fileset = filesetCatalog.loadFileset(NameIdentifier.of(idents[2], idents[3]));
// Should mock
// Credential credentials = fileset.supportsCredentials().getCredential("s3-token");
Credential[] credentials = fileset.supportsCredentials().getCredentials();

// Can't find any credential, use the default one.
if (credentials.length == 0) {
expirationTime = Long.MAX_VALUE;
this.basicSessionCredentials =
new BasicAWSCredentials(
configuration.get(Constants.ACCESS_KEY), configuration.get(Constants.SECRET_KEY));
return;
}

Credential credentials = new S3TokenCredential("ASIAZ6", "NFzd", "xx", 1735033800000L);
Credential credential = credentials[0];
Map<String, String> credentialMap = credential.toProperties();

Map<String, String> credentialMap = credentials.credentialInfo();
String accessKeyId = credentialMap.get(GRAVITINO_S3_SESSION_ACCESS_KEY_ID);
String secretAccessKey = credentialMap.get(GRAVITINO_S3_SESSION_SECRET_ACCESS_KEY);
String sessionToken = credentialMap.get(GRAVITINO_S3_TOKEN);

this.basicSessionCredentials =
new BasicSessionCredentials(accessKeyId, secretAccessKey, sessionToken);
this.expirationTime = credentials.expireTimeInMs();
if (S3TokenCredential.S3_TOKEN_CREDENTIAL_TYPE.equals(
credentialMap.get(Credential.CREDENTIAL_TYPE))) {
String sessionToken = credentialMap.get(GRAVITINO_S3_TOKEN);
this.basicSessionCredentials =
new BasicSessionCredentials(accessKeyId, secretAccessKey, sessionToken);
} else {
this.basicSessionCredentials = new BasicAWSCredentials(accessKeyId, secretAccessKey);
}

this.expirationTime = credential.expireTimeInMs();
if (expirationTime <= 0) {
expirationTime = Long.MAX_VALUE;
}
}
}
2 changes: 2 additions & 0 deletions bundles/azure-bundle/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ dependencies {
exclude("*")
}
implementation(project(":clients:client-java-runtime", configuration = "shadow"))

implementation(project(":clients:filesystem-hadoop3-runtime", configuration = "shadow"))
}

tasks.withType(ShadowJar::class.java) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,17 +27,21 @@
import java.io.IOException;
import java.util.Map;
import javax.annotation.Nonnull;
import org.apache.gravitino.catalog.hadoop.common.Properties;
import org.apache.gravitino.catalog.hadoop.fs.FileSystemProvider;
import org.apache.gravitino.catalog.hadoop.fs.FileSystemUtils;
import org.apache.gravitino.filesystem.hadoop.GravitinoVirtualFileSystemConfiguration;
import org.apache.gravitino.storage.AzureProperties;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.azurebfs.services.AuthType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class AzureFileSystemProvider implements FileSystemProvider {

private static final Logger LOGGER = LoggerFactory.getLogger(AzureFileSystemProvider.class);

@VisibleForTesting public static final String ABS_PROVIDER_SCHEME = "abfss";

@VisibleForTesting public static final String ABS_PROVIDER_NAME = "abs";
Expand Down Expand Up @@ -67,19 +71,36 @@ public FileSystem getFileSystem(@Nonnull Path path, @Nonnull Map<String, String>
configuration.set(ABFS_IMPL_KEY, ABFS_IMPL);
}

if (config.containsKey(Properties.USE_GRAVITINO_CLOUD_STORE_CREDENTIAL)
&& Boolean.parseBoolean(config.get(Properties.USE_GRAVITINO_CLOUD_STORE_CREDENTIAL))) {
String pathString = path.toString();
String accountSuffix = pathString.split("@")[1].split("/")[0];
hadoopConfMap.forEach(configuration::set);

configuration.set(FS_AZURE_ACCOUNT_AUTH_TYPE_PROPERTY_NAME, AuthType.SAS.name());
configuration.set(
FS_AZURE_SAS_TOKEN_PROVIDER_TYPE + "." + accountSuffix,
AzureSasCredentialProvider.class.getName());
// Check whether this is from GVFS client.
if (config.containsKey(GravitinoVirtualFileSystemConfiguration.FS_GRAVITINO_SERVER_URI_KEY)) {
// Test whether SAS works
try {
AzureSasCredentialProvider azureSasCredentialProvider = new AzureSasCredentialProvider();
azureSasCredentialProvider.initialize(configuration, null);
String sas = azureSasCredentialProvider.getSASToken(null, null, null, null);
if (sas != null) {
configuration.set(FS_AZURE_ACCOUNT_AUTH_TYPE_PROPERTY_NAME, AuthType.SAS.name());
configuration.set(
FS_AZURE_SAS_TOKEN_PROVIDER_TYPE + ".dfs.core.windows.net",
AzureSasCredentialProvider.class.getName());
} else if (azureSasCredentialProvider.getAzureStorageAccountKey() != null
&& azureSasCredentialProvider.getAzureStorageAccountName() != null) {
configuration.set(
String.format(
"fs.azure.account.key.%s.dfs.core.windows.net",
azureSasCredentialProvider.getAzureStorageAccountName()),
azureSasCredentialProvider.getAzureStorageAccountKey());
}
} catch (Exception e) {
// Can't use SAS, use account key and account key instead
LOGGER.warn(
"Failed to use SAS token and user account from credential provider, use default conf. ",
e);
}
}

hadoopConfMap.forEach(configuration::set);

return FileSystem.get(path.toUri(), configuration);
}

Expand Down
Loading

0 comments on commit 5b648e8

Please sign in to comment.