Skip to content

Commit

Permalink
[apache#772] fix(kerberos): cache proxy user ugi to avoid memory leak (
Browse files Browse the repository at this point in the history
…apache#773)

### What changes were proposed in this pull request?

1. To avoid memory leak by caching of proxy user UGI.

### Why are the changes needed?

Fix: apache#772 

The Hadoop filesystem instance will be created too many time in cache, 
which will cause the shuffle server memory leak.

As we know, the filesystem cache's key is built by the scheme、authority and UGI. 
The scheme and authority are not changed every time. But for UGI, if we invoke the 
createProxyUser, it will always create a new one, that means the every invoking `Filesystem.get()`,
it will be cached due to different key.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?
1. Existing UTs
2. Added tests
  • Loading branch information
zuston authored and jerqi committed Apr 13, 2023
1 parent 6857ea5 commit 89c2b92
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,20 @@

import java.io.IOException;
import java.security.PrivilegedExceptionAction;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.UserGroupInformation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.uniffle.common.util.JavaUtils;
import org.apache.uniffle.common.util.ThreadUtils;

public class HadoopSecurityContext implements SecurityContext {
Expand All @@ -39,6 +42,12 @@ public class HadoopSecurityContext implements SecurityContext {
private UserGroupInformation loginUgi;
private ScheduledExecutorService refreshScheduledExecutor;

// The purpose of the proxy user ugi cache is to prevent the creation of
// multiple cache keys for the same user, scheme, and authority in the Hadoop filesystem.
// Without this cache, large amounts of unnecessary filesystem instances could be stored in memory,
// leading to potential memory leaks. For more information on this issue, refer to #706.
private Map<String, UserGroupInformation> proxyUserUgiPool;

public HadoopSecurityContext(
String krb5ConfPath,
String keytabFile,
Expand Down Expand Up @@ -76,6 +85,7 @@ public HadoopSecurityContext(
refreshIntervalSec,
refreshIntervalSec,
TimeUnit.SECONDS);
proxyUserUgiPool = JavaUtils.newConcurrentMap();
}

private void authRefresh() {
Expand All @@ -95,8 +105,10 @@ public <T> T runSecured(String user, Callable<T> securedCallable) throws Excepti

// Run with the proxy user.
if (!user.equals(loginUgi.getShortUserName())) {
UserGroupInformation proxyUserUgi =
proxyUserUgiPool.computeIfAbsent(user, x -> UserGroupInformation.createProxyUser(x, loginUgi));
return executeWithUgiWrapper(
UserGroupInformation.createProxyUser(user, loginUgi),
proxyUserUgi,
securedCallable
);
}
Expand All @@ -114,10 +126,20 @@ private <T> T executeWithUgiWrapper(UserGroupInformation ugi, Callable<T> callab
return ugi.doAs((PrivilegedExceptionAction<T>) callable::call);
}

// Only for tests
@VisibleForTesting
Map<String, UserGroupInformation> getProxyUserUgiPool() {
return proxyUserUgiPool;
}

@Override
public void close() throws IOException {
if (refreshScheduledExecutor != null) {
refreshScheduledExecutor.shutdown();
}
if (proxyUserUgiPool != null) {
proxyUserUgiPool.clear();
proxyUserUgiPool = null;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,13 @@
package org.apache.uniffle.common.security;

import java.util.concurrent.Callable;
import java.util.concurrent.atomic.AtomicReference;

import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.security.UserGroupInformation;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;

Expand Down Expand Up @@ -66,13 +69,28 @@ public void testSecuredCallable() throws Exception {

// case3: run by the proxy user
Path pathWithAlexUser = new Path("/alex/HadoopSecurityContextTest");
AtomicReference<UserGroupInformation> ugi1 = new AtomicReference<>();
context.runSecured("alex", (Callable<Void>) () -> {
ugi1.set(UserGroupInformation.getCurrentUser());
kerberizedHdfs.getFileSystem().mkdirs(pathWithAlexUser);
return null;
});
fileStatus = kerberizedHdfs.getFileSystem().getFileStatus(pathWithAlexUser);
assertEquals("alex", fileStatus.getOwner());

// case4: run by the proxy user again, it will always return the same
// ugi and filesystem instance.
AtomicReference<UserGroupInformation> ugi2 = new AtomicReference<>();
context.runSecured("alex", (Callable<Void>) () -> {
ugi2.set(UserGroupInformation.getCurrentUser());
return null;
});
assertTrue(ugi1.get() == ugi2.get());
assertTrue(ugi1.get() == context.getProxyUserUgiPool().get("alex"));

FileSystem fileSystem1 = context.runSecured("alex", () -> FileSystem.get(kerberizedHdfs.getConf()));
FileSystem fileSystem2 = context.runSecured("alex", () -> FileSystem.get(kerberizedHdfs.getConf()));
assertTrue(fileSystem1 == fileSystem2);
}
}

Expand Down

0 comments on commit 89c2b92

Please sign in to comment.