Skip to content

Commit

Permalink
support Aliyun oss credential provider apache#5625
Browse files Browse the repository at this point in the history
  • Loading branch information
sunxiaojian committed Nov 27, 2024
1 parent e89d80f commit 3656d1b
Show file tree
Hide file tree
Showing 13 changed files with 868 additions and 1 deletion.
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.gravitino.credential;

import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
import java.util.Map;
import org.apache.commons.lang3.StringUtils;

/** OSS token credential. */
public class OSSTokenCredential implements Credential {

/** OSS token credential type. */
public static final String OSS_TOKEN_CREDENTIAL_TYPE = "oss-token";
/** OSS session access key ID used to access OSS data. */
public static final String GRAVITINO_OSS_SESSION_ACCESS_KEY_ID = "oss-access-key-id";
/** OSS session secret access key used to access OSS data. */
public static final String GRAVITINO_OSS_SESSION_SECRET_ACCESS_KEY = "oss-secret-access-key";
/** OSS session token. */
public static final String GRAVITINO_OSS_TOKEN = "oss-security-token";

private final String accessKeyId;
private final String secretAccessKey;
private final String securityToken;
private final long expireTimeInMS;

/**
* Constructs an instance of {@link OSSTokenCredential} with session secret key and token.
*
* @param accessKeyId The oss session access key ID.
* @param secretAccessKey The oss session secret access key.
* @param securityToken The oss security token.
* @param expireTimeInMS The oss session token expire time in ms.
*/
public OSSTokenCredential(
String accessKeyId, String secretAccessKey, String securityToken, long expireTimeInMS) {
Preconditions.checkArgument(
StringUtils.isNotBlank(accessKeyId), "OSS access key Id should not be empty");
Preconditions.checkArgument(
StringUtils.isNotBlank(secretAccessKey), "OSS secret access key should not be empty");
Preconditions.checkArgument(
StringUtils.isNotBlank(securityToken), "OSS security token should not be empty");

this.accessKeyId = accessKeyId;
this.secretAccessKey = secretAccessKey;
this.securityToken = securityToken;
this.expireTimeInMS = expireTimeInMS;
}

@Override
public String credentialType() {
return OSS_TOKEN_CREDENTIAL_TYPE;
}

@Override
public long expireTimeInMs() {
return expireTimeInMS;
}

@Override
public Map<String, String> credentialInfo() {
return (new ImmutableMap.Builder<String, String>())
.put(GRAVITINO_OSS_SESSION_ACCESS_KEY_ID, accessKeyId)
.put(GRAVITINO_OSS_SESSION_SECRET_ACCESS_KEY, secretAccessKey)
.put(GRAVITINO_OSS_TOKEN, securityToken)
.build();
}

/**
* Get oss session access key ID.
*
* @return The oss access key ID.
*/
public String accessKeyId() {
return accessKeyId;
}

/**
* Get oss session secret access key.
*
* @return The oss secret access key.
*/
public String secretAccessKey() {
return secretAccessKey;
}

/**
* Get oss session token.
*
* @return The oss session token.
*/
public String securityToken() {
return securityToken;
}
}
7 changes: 7 additions & 0 deletions bundles/aliyun-bundle/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,15 @@ plugins {
}

dependencies {
compileOnly(project(":api"))
compileOnly(project(":core"))
compileOnly(project(":catalogs:catalog-common"))
compileOnly(project(":catalogs:catalog-hadoop"))
compileOnly(libs.hadoop3.common)

implementation(libs.aliyun.oss.sdk)
implementation(libs.aliyun.credentials.sdk)
implementation(libs.aliyun.kms.sdk)
implementation(libs.hadoop3.oss)

// oss needs StringUtils from commons-lang3 or the following error will occur in 3.3.0
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,262 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.gravitino.oss.credential;

import com.aliyun.credentials.Client;
import com.aliyun.credentials.models.Config;
import com.aliyun.credentials.models.CredentialModel;
import com.aliyun.credentials.utils.AuthConstant;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.IOException;
import java.net.URI;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.stream.Stream;
import javax.annotation.Nullable;
import org.apache.commons.lang3.StringUtils;
import org.apache.gravitino.credential.Credential;
import org.apache.gravitino.credential.CredentialContext;
import org.apache.gravitino.credential.CredentialProvider;
import org.apache.gravitino.credential.OSSTokenCredential;
import org.apache.gravitino.credential.PathBasedCredentialContext;
import org.apache.gravitino.credential.config.OSSCredentialConfig;
import org.apache.gravitino.oss.credential.policy.Condition;
import org.apache.gravitino.oss.credential.policy.Effect;
import org.apache.gravitino.oss.credential.policy.Policy;
import org.apache.gravitino.oss.credential.policy.Statement;
import org.apache.gravitino.oss.credential.policy.StringLike;

/** Generates OSS token to access OSS data. */
public class OSSTokenProvider implements CredentialProvider {
private final ObjectMapper objectMapper = new ObjectMapper();
private String accessKeyId;
private String secretAccessKey;
private String roleArn;
private String externalID;
private int tokenExpireSecs;

/**
* Initializes the credential provider with catalog properties.
*
* @param properties catalog properties that can be used to configure the provider. The specific
* properties required vary by implementation.
*/
@Override
public void initialize(Map<String, String> properties) {
OSSCredentialConfig credentialConfig = new OSSCredentialConfig(properties);
this.roleArn = credentialConfig.ossRoleArn();
this.externalID = credentialConfig.externalID();
this.tokenExpireSecs = credentialConfig.tokenExpireInSecs();
this.accessKeyId = credentialConfig.accessKeyID();
this.secretAccessKey = credentialConfig.secretAccessKey();
}

/**
* Returns the type of credential, it should be identical in Gravitino.
*
* @return A string identifying the type of credentials.
*/
@Override
public String credentialType() {
return OSSTokenCredential.OSS_TOKEN_CREDENTIAL_TYPE;
}

/**
* Obtains a credential based on the provided context information.
*
* @param context A context object providing necessary information for retrieving credentials.
* @return A Credential object containing the authentication information needed to access a system
* or resource. Null will be returned if no credential is available.
*/
@Nullable
@Override
public Credential getCredential(CredentialContext context) {
if (!(context instanceof PathBasedCredentialContext)) {
return null;
}
PathBasedCredentialContext pathBasedCredentialContext = (PathBasedCredentialContext) context;
CredentialModel credentialModel =
createOSSCredentialModel(
roleArn,
pathBasedCredentialContext.getReadPaths(),
pathBasedCredentialContext.getWritePaths(),
pathBasedCredentialContext.getUserName());
return new OSSTokenCredential(
credentialModel.accessKeyId,
credentialModel.accessKeySecret,
credentialModel.securityToken,
credentialModel.expiration);
}

private CredentialModel createOSSCredentialModel(
String roleArn, Set<String> readLocations, Set<String> writeLocations, String userName) {
Config config = new Config();
config.setAccessKeyId(accessKeyId);
config.setAccessKeySecret(secretAccessKey);
config.setType(AuthConstant.RAM_ROLE_ARN);
config.setRoleArn(roleArn);
config.setRoleName(getRoleName(userName));
if (StringUtils.isNotBlank(externalID)) {
config.setExternalId(externalID);
}
config.setRoleSessionExpiration(tokenExpireSecs);
config.setPolicy(createPolicy(readLocations, writeLocations));
return new Client(config).getCredential();
}

// reference:
// https://help.aliyun.com/zh/oss/user-guide/tutorial-use-ram-policies-to-control-access-to-oss?spm=a2c4g.11186623.help-menu-31815.d_2_4_6_1.76901df8T7gfl8
public String createPolicy(Set<String> readLocations, Set<String> writeLocations) {
Policy.Builder policyBuilder = Policy.builder();

// Allow read and write access to the specified locations
Statement.Builder allowGetObjectStatementBuilder =
Statement.builder()
.effect(Effect.ALLOW)
.addAction("oss:GetObject")
.addAction("oss:GetObjectVersion");
// Add support for bucket-level policies
Map<String, Statement.Builder> bucketListStatementBuilder = new HashMap<>();
Map<String, Statement.Builder> bucketGetLocationStatementBuilder = new HashMap<>();

String arnPrefix = getArnPrefix();
Stream.concat(readLocations.stream(), writeLocations.stream())
.distinct()
.forEach(
location -> {
URI uri = URI.create(location);
allowGetObjectStatementBuilder.addResource(getOssUriWithArn(arnPrefix, uri));
String bucketArn = arnPrefix + getBucketName(uri);
// ListBucket
bucketListStatementBuilder.computeIfAbsent(
bucketArn,
key ->
Statement.builder()
.effect(Effect.ALLOW)
.addAction("oss:ListBucket")
.addResource(key)
.condition(getCondition(uri)));
// GetBucketLocation
bucketGetLocationStatementBuilder.computeIfAbsent(
bucketArn,
key ->
Statement.builder()
.effect(Effect.ALLOW)
.addAction("oss:GetBucketLocation")
.addResource(key));
});

if (!writeLocations.isEmpty()) {
Statement.Builder allowPutObjectStatementBuilder =
Statement.builder()
.effect(Effect.ALLOW)
.addAction("oss:PutObject")
.addAction("oss:DeleteObject");
writeLocations.forEach(
location -> {
URI uri = URI.create(location);
allowPutObjectStatementBuilder.addResource(getOssUriWithArn(arnPrefix, uri));
});
policyBuilder.addStatement(allowPutObjectStatementBuilder.build());
}

if (!bucketListStatementBuilder.isEmpty()) {
bucketListStatementBuilder
.values()
.forEach(statementBuilder -> policyBuilder.addStatement(statementBuilder.build()));
} else {
// add list privilege with 0 resources
policyBuilder.addStatement(
Statement.builder().effect(Effect.ALLOW).addAction("oss:ListBucket").build());
}
bucketGetLocationStatementBuilder
.values()
.forEach(statementBuilder -> policyBuilder.addStatement(statementBuilder.build()));

policyBuilder.addStatement(allowGetObjectStatementBuilder.build());
try {
return objectMapper.writeValueAsString(policyBuilder.build());
} catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
}

private Condition getCondition(URI uri) {
return Condition.builder()
.stringLike(
StringLike.builder()
.addPrefix(concatPathWithSep(trimLeadingSlash(uri.getPath()), "*", "/"))
.build())
.build();
}

private String getArnPrefix() {
return "acs:oss:*:*:";
}

private String getBucketName(URI uri) {
return uri.getHost();
}

private String getOssUriWithArn(String arnPrefix, URI uri) {
return arnPrefix + concatPathWithSep(removeSchemaFromOSSUri(uri), "*", "/");
}

private static String concatPathWithSep(String leftPath, String rightPath, String fileSep) {
if (leftPath.endsWith(fileSep) && rightPath.startsWith(fileSep)) {
return leftPath + rightPath.substring(1);
} else if (!leftPath.endsWith(fileSep) && !rightPath.startsWith(fileSep)) {
return leftPath + fileSep + rightPath;
} else {
return leftPath + rightPath;
}
}

// Transform 'oss://bucket/path' to /bucket/path
private String removeSchemaFromOSSUri(URI uri) {
String bucket = uri.getHost();
String path = trimLeadingSlash(uri.getPath());
return String.join(
"/", Stream.of(bucket, path).filter(Objects::nonNull).toArray(String[]::new));
}

private String trimLeadingSlash(String path) {
return path.startsWith("/") ? path.substring(1) : path;
}

private static String getRoleName(String userName) {
return "gravitino_" + userName;
}

/**
* Closes this stream and releases any system resources associated with it. If the stream is
* already closed then invoking this method has no effect.
*
* <p>As noted in {@link AutoCloseable#close()}, cases where the close may fail require careful
* attention. It is strongly advised to relinquish the underlying resources and to internally
* <em>mark</em> the {@code Closeable} as closed, prior to throwing the {@code IOException}.
*
* @throws IOException if an I/O error occurs
*/
@Override
public void close() throws IOException {}
}
Loading

0 comments on commit 3656d1b

Please sign in to comment.