diff --git a/common/src/main/java/org/apache/uniffle/common/security/HadoopSecurityContext.java b/common/src/main/java/org/apache/uniffle/common/security/HadoopSecurityContext.java index cb1da3c7e4..7da6e2e0f1 100644 --- a/common/src/main/java/org/apache/uniffle/common/security/HadoopSecurityContext.java +++ b/common/src/main/java/org/apache/uniffle/common/security/HadoopSecurityContext.java @@ -19,10 +19,13 @@ import java.io.IOException; import java.security.PrivilegedExceptionAction; +import java.util.Map; import java.util.concurrent.Callable; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.Maps; import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.security.UserGroupInformation; @@ -38,6 +41,12 @@ public class HadoopSecurityContext implements SecurityContext { private UserGroupInformation loginUgi; private ScheduledExecutorService refreshScheduledExecutor; + // The purpose of the proxy user ugi cache is to prevent the creation of + // multiple cache keys for the same user, scheme, and authority in the Hadoop filesystem. + // Without this cache, large amounts of unnecessary filesystem instances could be stored in memory, + // leading to potential memory leaks. For more information on this issue, refer to #706. + private Map proxyUserUgiPool; + public HadoopSecurityContext( String krb5ConfPath, String keytabFile, @@ -72,6 +81,7 @@ public HadoopSecurityContext( refreshIntervalSec, refreshIntervalSec, TimeUnit.SECONDS); + proxyUserUgiPool = Maps.newConcurrentMap(); } private void authRefresh() { @@ -91,8 +101,10 @@ public T runSecured(String user, Callable securedCallable) throws Excepti // Run with the proxy user. if (!user.equals(loginUgi.getShortUserName())) { + UserGroupInformation proxyUserUgi = + proxyUserUgiPool.computeIfAbsent(user, x -> UserGroupInformation.createProxyUser(x, loginUgi)); return executeWithUgiWrapper( - UserGroupInformation.createProxyUser(user, loginUgi), + proxyUserUgi, securedCallable ); } @@ -110,10 +122,20 @@ private T executeWithUgiWrapper(UserGroupInformation ugi, Callable callab return ugi.doAs((PrivilegedExceptionAction) callable::call); } + // Only for tests + @VisibleForTesting + Map getProxyUserUgiPool() { + return proxyUserUgiPool; + } + @Override public void close() throws IOException { if (refreshScheduledExecutor != null) { refreshScheduledExecutor.shutdown(); } + if (proxyUserUgiPool != null) { + proxyUserUgiPool.clear(); + proxyUserUgiPool = null; + } } }