Skip to content

Commit

Permalink
[apache#3337] improvement(hadoop-catalog): Support user impersonation…
Browse files Browse the repository at this point in the history
… for Hadoop catalog. (apache#3352)

### What changes were proposed in this pull request?

Add user impersonation for the Hadoop catalog.

### Why are the changes needed?

We need authentication for the encrypted HDFS cluster.

Fix: apache#3337

### Does this PR introduce _any_ user-facing change?

N/A. 

### How was this patch tested?

UT(TO add).
  • Loading branch information
yuqi1129 authored and diqiu50 committed Jun 13, 2024
1 parent 6fdfc88 commit 161d7fb
Show file tree
Hide file tree
Showing 12 changed files with 893 additions and 1 deletion.
3 changes: 3 additions & 0 deletions catalogs/catalog-hadoop/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,9 @@ dependencies {
testImplementation(project(":server"))
testImplementation(project(":server-common"))

testImplementation(libs.minikdc)
testImplementation(libs.hadoop3.minicluster)

testImplementation(libs.bundles.log4j)
testImplementation(libs.mockito.core)
testImplementation(libs.mysql.driver)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,13 @@
*/
package com.datastrato.gravitino.catalog.hadoop;

import com.datastrato.gravitino.catalog.hadoop.kerberos.KerberosConfig;
import com.datastrato.gravitino.connector.BaseCatalog;
import com.datastrato.gravitino.connector.CatalogOperations;
import com.datastrato.gravitino.connector.ProxyPlugin;
import com.datastrato.gravitino.connector.capability.Capability;
import java.util.Map;
import java.util.Optional;

/**
* Hadoop catalog is a fileset catalog that can manage filesets on the Hadoop Compatible File
Expand All @@ -31,4 +34,13 @@ protected CatalogOperations newOps(Map<String, String> config) {
protected Capability newCapability() {
return new HadoopCatalogCapability();
}

@Override
protected Optional<ProxyPlugin> newProxyPlugin(Map<String, String> config) {
boolean impersonationEnabled = new KerberosConfig(config).isImpersonationEnabled();
if (!impersonationEnabled) {
return Optional.empty();
}
return Optional.of(new HadoopProxyPlugin());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
package com.datastrato.gravitino.catalog.hadoop;

import static com.datastrato.gravitino.connector.BaseCatalog.CATALOG_BYPASS_PREFIX;
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION;

import com.datastrato.gravitino.Entity;
import com.datastrato.gravitino.EntityStore;
Expand All @@ -14,9 +15,12 @@
import com.datastrato.gravitino.Schema;
import com.datastrato.gravitino.SchemaChange;
import com.datastrato.gravitino.StringIdentifier;
import com.datastrato.gravitino.catalog.hadoop.kerberos.AuthenticationConfig;
import com.datastrato.gravitino.catalog.hadoop.kerberos.KerberosClient;
import com.datastrato.gravitino.connector.CatalogInfo;
import com.datastrato.gravitino.connector.CatalogOperations;
import com.datastrato.gravitino.connector.PropertiesMetadata;
import com.datastrato.gravitino.connector.ProxyPlugin;
import com.datastrato.gravitino.connector.SupportsSchemas;
import com.datastrato.gravitino.exceptions.AlreadyExistsException;
import com.datastrato.gravitino.exceptions.FilesetAlreadyExistsException;
Expand All @@ -36,6 +40,7 @@
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.Maps;
import java.io.File;
import java.io.IOException;
import java.time.Instant;
import java.util.Collections;
Expand All @@ -47,6 +52,7 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.security.UserGroupInformation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -72,7 +78,15 @@ public class HadoopCatalogOperations implements CatalogOperations, SupportsSchem

@VisibleForTesting Optional<Path> catalogStorageLocation;

// For testing only.
private Map<String, String> conf;

@SuppressWarnings("unused")
private ProxyPlugin proxyPlugin;

private String kerberosRealm;

private CatalogInfo catalogInfo;

HadoopCatalogOperations(EntityStore store) {
this.store = store;
}
Expand All @@ -81,10 +95,16 @@ public HadoopCatalogOperations() {
this(GravitinoEnv.getInstance().entityStore());
}

public String getKerberosRealm() {
return kerberosRealm;
}

@Override
public void initialize(Map<String, String> config, CatalogInfo info) throws RuntimeException {
// Initialize Hadoop Configuration.
this.conf = config;
this.hadoopConf = new Configuration();
this.catalogInfo = info;
Map<String, String> bypassConfigs =
config.entrySet().stream()
.filter(e -> e.getKey().startsWith(CATALOG_BYPASS_PREFIX))
Expand All @@ -98,9 +118,31 @@ public void initialize(Map<String, String> config, CatalogInfo info) throws Runt
(String)
CATALOG_PROPERTIES_METADATA.getOrDefault(
config, HadoopCatalogPropertiesMetadata.LOCATION);
conf.forEach(hadoopConf::set);

initAuthentication(conf, hadoopConf);

this.catalogStorageLocation = Optional.ofNullable(catalogLocation).map(Path::new);
}

private void initAuthentication(Map<String, String> conf, Configuration hadoopConf) {
AuthenticationConfig config = new AuthenticationConfig(conf);
boolean enableAuth = config.isEnableAuth();
String authType = config.getAuthType();

if (enableAuth && StringUtils.equalsIgnoreCase(authType, "kerberos")) {
hadoopConf.set(HADOOP_SECURITY_AUTHENTICATION, "kerberos");
UserGroupInformation.setConfiguration(hadoopConf);
try {
KerberosClient kerberosClient = new KerberosClient(conf, hadoopConf);
File keytabFile = kerberosClient.saveKeyTabFileFromUri(catalogInfo.id());
this.kerberosRealm = kerberosClient.login(keytabFile.getAbsolutePath());
} catch (IOException e) {
throw new RuntimeException("Failed to login with kerberos", e);
}
}
}

@Override
public NameIdentifier[] listFilesets(Namespace namespace) throws NoSuchSchemaException {
try {
Expand Down Expand Up @@ -609,4 +651,8 @@ static Path formalizePath(Path path, Configuration configuration) throws IOExcep
FileSystem defaultFs = FileSystem.get(configuration);
return path.makeQualified(defaultFs.getUri(), defaultFs.getWorkingDirectory());
}

void setProxyPlugin(HadoopProxyPlugin hadoopProxyPlugin) {
this.proxyPlugin = hadoopProxyPlugin;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
*/
package com.datastrato.gravitino.catalog.hadoop;

import com.datastrato.gravitino.catalog.hadoop.kerberos.AuthenticationConfig;
import com.datastrato.gravitino.catalog.hadoop.kerberos.KerberosConfig;
import com.datastrato.gravitino.connector.BaseCatalogPropertiesMetadata;
import com.datastrato.gravitino.connector.PropertyEntry;
import com.google.common.collect.ImmutableMap;
Expand All @@ -29,6 +31,9 @@ public class HadoopCatalogPropertiesMetadata extends BaseCatalogPropertiesMetada
null,
false /* hidden */))
.putAll(BASIC_CATALOG_PROPERTY_ENTRIES)
// The following two are about authentication.
.putAll(KerberosConfig.KERBEROS_PROPERTY_ENTRIES)
.putAll(AuthenticationConfig.AUTHENTICATION_PROPERTY_ENTRIES)
.build();

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
/*
* Copyright 2024 Datastrato Pvt Ltd.
* This software is licensed under the Apache License version 2.
*/

package com.datastrato.gravitino.catalog.hadoop;

import com.datastrato.gravitino.connector.CatalogOperations;
import com.datastrato.gravitino.connector.ProxyPlugin;
import com.datastrato.gravitino.utils.Executable;
import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.UndeclaredThrowableException;
import java.security.Principal;
import java.security.PrivilegedActionException;
import java.security.PrivilegedExceptionAction;
import java.util.Map;
import org.apache.hadoop.security.UserGroupInformation;

public class HadoopProxyPlugin implements ProxyPlugin {
private HadoopCatalogOperations ops;
private UserGroupInformation realUser;

public HadoopProxyPlugin() {
try {
realUser = UserGroupInformation.getCurrentUser();
} catch (IOException ioe) {
throw new IllegalStateException("Fail to init HadoopCatalogProxyPlugin");
}
}

@Override
public Object doAs(
Principal principal, Executable<Object, Exception> action, Map<String, String> properties)
throws Throwable {
try {
UserGroupInformation proxyUser;

if (UserGroupInformation.isSecurityEnabled() && ops != null) {
// The Gravitino server may use multiple KDC servers.
// The http authentication use one KDC server, the Hadoop catalog may use another KDC
// server.
// The KerberosAuthenticator will remove realm of principal.
// And then we add the realm of Hadoop catalog to the user.
String proxyKerberosPrincipalName = principal.getName();
if (!proxyKerberosPrincipalName.contains("@")) {
proxyKerberosPrincipalName =
String.format("%s@%s", proxyKerberosPrincipalName, ops.getKerberosRealm());
}

proxyUser = UserGroupInformation.createProxyUser(proxyKerberosPrincipalName, realUser);
} else {
proxyUser = UserGroupInformation.createProxyUser(principal.getName(), realUser);
}

return proxyUser.doAs((PrivilegedExceptionAction<Object>) action::execute);
} catch (UndeclaredThrowableException e) {
Throwable innerException = e.getCause();
if (innerException instanceof PrivilegedActionException) {
throw innerException.getCause();
} else if (innerException instanceof InvocationTargetException) {
throw innerException.getCause();
} else {
throw innerException;
}
}
}

@Override
public void bindCatalogOperation(CatalogOperations ops) {
this.ops = ((HadoopCatalogOperations) ops);
this.ops.setProxyPlugin(this);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
/*
* Copyright 2024 Datastrato Pvt Ltd.
* This software is licensed under the Apache License version 2.
*/

package com.datastrato.gravitino.catalog.hadoop.kerberos;

import com.datastrato.gravitino.Config;
import com.datastrato.gravitino.config.ConfigBuilder;
import com.datastrato.gravitino.config.ConfigConstants;
import com.datastrato.gravitino.config.ConfigEntry;
import com.datastrato.gravitino.connector.PropertyEntry;
import com.google.common.collect.ImmutableMap;
import java.util.Map;

public class AuthenticationConfig extends Config {
public static final String ENABLE_AUTH_KEY = "authentication.enable";
public static final String AUTH_TYPE_KEY = "authentication.type";

public AuthenticationConfig(Map<String, String> properties) {
super(false);
loadFromMap(properties, k -> true);
}

public static final ConfigEntry<Boolean> ENABLE_AUTH_ENTRY =
new ConfigBuilder(ENABLE_AUTH_KEY)
.doc("Whether to enable authentication for Hadoop catalog")
.version(ConfigConstants.VERSION_0_5_1)
.booleanConf()
.createWithDefault(false);

public static final ConfigEntry<String> AUTH_TYPE_ENTRY =
new ConfigBuilder(AUTH_TYPE_KEY)
.doc("The type of authentication for Hadoop catalog, currently we only support kerberos")
.version(ConfigConstants.VERSION_0_5_1)
.stringConf()
.create();

public boolean isEnableAuth() {
return get(ENABLE_AUTH_ENTRY);
}

public String getAuthType() {
return get(AUTH_TYPE_ENTRY);
}

public static final Map<String, PropertyEntry<?>> AUTHENTICATION_PROPERTY_ENTRIES =
new ImmutableMap.Builder<String, PropertyEntry<?>>()
.put(
ENABLE_AUTH_KEY,
PropertyEntry.booleanPropertyEntry(
ENABLE_AUTH_KEY,
"Whether to enable authentication for Hadoop catalog",
false,
true,
false,
false,
false))
.put(
AUTH_TYPE_KEY,
PropertyEntry.stringImmutablePropertyEntry(
AUTH_TYPE_KEY,
"The type of authentication for Hadoop catalog, currently we only support kerberos",
false,
null,
false,
false))
.build();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
* Copyright 2024 Datastrato Pvt Ltd.
* This software is licensed under the Apache License version 2.
*/
package com.datastrato.gravitino.catalog.hadoop.kerberos;

import java.io.File;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.nio.file.Files;
import java.util.Optional;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;

public class FetchFileUtils {

private FetchFileUtils() {}

public static void fetchFileFromUri(
String fileUri, File destFile, int timeout, Configuration conf) throws IOException {
try {
URI uri = new URI(fileUri);
String scheme = Optional.ofNullable(uri.getScheme()).orElse("file");

switch (scheme) {
case "http":
case "https":
case "ftp":
FileUtils.copyURLToFile(uri.toURL(), destFile, timeout * 1000, timeout * 1000);
break;

case "file":
Files.createSymbolicLink(destFile.toPath(), new File(uri.getPath()).toPath());
break;

case "hdfs":
FileSystem.get(conf).copyToLocalFile(new Path(uri), new Path(destFile.toURI()));
break;

default:
throw new IllegalArgumentException(
String.format("Doesn't support the scheme %s", scheme));
}
} catch (URISyntaxException ue) {
throw new IllegalArgumentException("The uri of file has the wrong format", ue);
}
}
}
Loading

0 comments on commit 161d7fb

Please sign in to comment.