diff --git a/common/src/main/java/org/apache/uniffle/common/security/HadoopSecurityContext.java b/common/src/main/java/org/apache/uniffle/common/security/HadoopSecurityContext.java index 49083d365c..1d8349f9cb 100644 --- a/common/src/main/java/org/apache/uniffle/common/security/HadoopSecurityContext.java +++ b/common/src/main/java/org/apache/uniffle/common/security/HadoopSecurityContext.java @@ -19,17 +19,20 @@ import java.io.IOException; import java.security.PrivilegedExceptionAction; +import java.util.Map; import java.util.concurrent.Callable; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; +import com.google.common.annotations.VisibleForTesting; import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.security.UserGroupInformation; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.uniffle.common.util.JavaUtils; import org.apache.uniffle.common.util.ThreadUtils; public class HadoopSecurityContext implements SecurityContext { @@ -39,6 +42,12 @@ public class HadoopSecurityContext implements SecurityContext { private UserGroupInformation loginUgi; private ScheduledExecutorService refreshScheduledExecutor; + // The purpose of the proxy user ugi cache is to prevent the creation of + // multiple cache keys for the same user, scheme, and authority in the Hadoop filesystem. + // Without this cache, large amounts of unnecessary filesystem instances could be stored in memory, + // leading to potential memory leaks. For more information on this issue, refer to #706. + private Map proxyUserUgiPool; + public HadoopSecurityContext( String krb5ConfPath, String keytabFile, @@ -76,6 +85,7 @@ public HadoopSecurityContext( refreshIntervalSec, refreshIntervalSec, TimeUnit.SECONDS); + proxyUserUgiPool = JavaUtils.newConcurrentMap(); } private void authRefresh() { @@ -95,8 +105,10 @@ public T runSecured(String user, Callable securedCallable) throws Excepti // Run with the proxy user. if (!user.equals(loginUgi.getShortUserName())) { + UserGroupInformation proxyUserUgi = + proxyUserUgiPool.computeIfAbsent(user, x -> UserGroupInformation.createProxyUser(x, loginUgi)); return executeWithUgiWrapper( - UserGroupInformation.createProxyUser(user, loginUgi), + proxyUserUgi, securedCallable ); } @@ -114,10 +126,20 @@ private T executeWithUgiWrapper(UserGroupInformation ugi, Callable callab return ugi.doAs((PrivilegedExceptionAction) callable::call); } + // Only for tests + @VisibleForTesting + Map getProxyUserUgiPool() { + return proxyUserUgiPool; + } + @Override public void close() throws IOException { if (refreshScheduledExecutor != null) { refreshScheduledExecutor.shutdown(); } + if (proxyUserUgiPool != null) { + proxyUserUgiPool.clear(); + proxyUserUgiPool = null; + } } } diff --git a/common/src/test/java/org/apache/uniffle/common/security/HadoopSecurityContextTest.java b/common/src/test/java/org/apache/uniffle/common/security/HadoopSecurityContextTest.java index 53ad36b231..38a34db1f6 100644 --- a/common/src/test/java/org/apache/uniffle/common/security/HadoopSecurityContextTest.java +++ b/common/src/test/java/org/apache/uniffle/common/security/HadoopSecurityContextTest.java @@ -18,10 +18,13 @@ package org.apache.uniffle.common.security; import java.util.concurrent.Callable; +import java.util.concurrent.atomic.AtomicReference; import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.security.UserGroupInformation; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; @@ -66,13 +69,28 @@ public void testSecuredCallable() throws Exception { // case3: run by the proxy user Path pathWithAlexUser = new Path("/alex/HadoopSecurityContextTest"); + AtomicReference ugi1 = new AtomicReference<>(); context.runSecured("alex", (Callable) () -> { + ugi1.set(UserGroupInformation.getCurrentUser()); kerberizedHdfs.getFileSystem().mkdirs(pathWithAlexUser); return null; }); fileStatus = kerberizedHdfs.getFileSystem().getFileStatus(pathWithAlexUser); assertEquals("alex", fileStatus.getOwner()); + // case4: run by the proxy user again, it will always return the same + // ugi and filesystem instance. + AtomicReference ugi2 = new AtomicReference<>(); + context.runSecured("alex", (Callable) () -> { + ugi2.set(UserGroupInformation.getCurrentUser()); + return null; + }); + assertTrue(ugi1.get() == ugi2.get()); + assertTrue(ugi1.get() == context.getProxyUserUgiPool().get("alex")); + + FileSystem fileSystem1 = context.runSecured("alex", () -> FileSystem.get(kerberizedHdfs.getConf())); + FileSystem fileSystem2 = context.runSecured("alex", () -> FileSystem.get(kerberizedHdfs.getConf())); + assertTrue(fileSystem1 == fileSystem2); } }