From d0ae1a9cc3127245ab04747219424e6b80777935 Mon Sep 17 00:00:00 2001 From: Junfan Zhang Date: Wed, 29 Mar 2023 11:00:58 +0800 Subject: [PATCH] [#772] fix(kerberos): cache proxy user ugi to avoid memory leak (#773) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 1. To avoid memory leak by caching of proxy user UGI. Fix: #772 The Hadoop filesystem instance will be created too many time in cache, which will cause the shuffle server memory leak. As we know, the filesystem cache's key is built by the scheme态authority and UGI. The scheme and authority are not changed every time. But for UGI, if we invoke the createProxyUser, it will always create a new one, that means the every invoking `Filesystem.get()`, it will be cached due to different key. No. 1. Existing UTs 2. Added tests --- .../security/HadoopSecurityContext.java | 24 ++++++++++++++++++- .../security/HadoopSecurityContextTest.java | 18 ++++++++++++++ 2 files changed, 41 insertions(+), 1 deletion(-) diff --git a/common/src/main/java/org/apache/uniffle/common/security/HadoopSecurityContext.java b/common/src/main/java/org/apache/uniffle/common/security/HadoopSecurityContext.java index 49083d365c..88e52ba1b5 100644 --- a/common/src/main/java/org/apache/uniffle/common/security/HadoopSecurityContext.java +++ b/common/src/main/java/org/apache/uniffle/common/security/HadoopSecurityContext.java @@ -19,11 +19,14 @@ import java.io.IOException; import java.security.PrivilegedExceptionAction; +import java.util.Map; import java.util.concurrent.Callable; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.Maps; import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.security.UserGroupInformation; @@ -39,6 +42,12 @@ public class HadoopSecurityContext implements SecurityContext { private UserGroupInformation loginUgi; private ScheduledExecutorService refreshScheduledExecutor; + // The purpose of the proxy user ugi cache is to prevent the creation of + // multiple cache keys for the same user, scheme, and authority in the Hadoop filesystem. + // Without this cache, large amounts of unnecessary filesystem instances could be stored in memory, + // leading to potential memory leaks. For more information on this issue, refer to #706. + private Map proxyUserUgiPool; + public HadoopSecurityContext( String krb5ConfPath, String keytabFile, @@ -76,6 +85,7 @@ public HadoopSecurityContext( refreshIntervalSec, refreshIntervalSec, TimeUnit.SECONDS); + proxyUserUgiPool = Maps.newConcurrentMap(); } private void authRefresh() { @@ -95,8 +105,10 @@ public T runSecured(String user, Callable securedCallable) throws Excepti // Run with the proxy user. if (!user.equals(loginUgi.getShortUserName())) { + UserGroupInformation proxyUserUgi = + proxyUserUgiPool.computeIfAbsent(user, x -> UserGroupInformation.createProxyUser(x, loginUgi)); return executeWithUgiWrapper( - UserGroupInformation.createProxyUser(user, loginUgi), + proxyUserUgi, securedCallable ); } @@ -114,10 +126,20 @@ private T executeWithUgiWrapper(UserGroupInformation ugi, Callable callab return ugi.doAs((PrivilegedExceptionAction) callable::call); } + // Only for tests + @VisibleForTesting + Map getProxyUserUgiPool() { + return proxyUserUgiPool; + } + @Override public void close() throws IOException { if (refreshScheduledExecutor != null) { refreshScheduledExecutor.shutdown(); } + if (proxyUserUgiPool != null) { + proxyUserUgiPool.clear(); + proxyUserUgiPool = null; + } } } diff --git a/common/src/test/java/org/apache/uniffle/common/security/HadoopSecurityContextTest.java b/common/src/test/java/org/apache/uniffle/common/security/HadoopSecurityContextTest.java index 53ad36b231..38a34db1f6 100644 --- a/common/src/test/java/org/apache/uniffle/common/security/HadoopSecurityContextTest.java +++ b/common/src/test/java/org/apache/uniffle/common/security/HadoopSecurityContextTest.java @@ -18,10 +18,13 @@ package org.apache.uniffle.common.security; import java.util.concurrent.Callable; +import java.util.concurrent.atomic.AtomicReference; import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.security.UserGroupInformation; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; @@ -66,13 +69,28 @@ public void testSecuredCallable() throws Exception { // case3: run by the proxy user Path pathWithAlexUser = new Path("/alex/HadoopSecurityContextTest"); + AtomicReference ugi1 = new AtomicReference<>(); context.runSecured("alex", (Callable) () -> { + ugi1.set(UserGroupInformation.getCurrentUser()); kerberizedHdfs.getFileSystem().mkdirs(pathWithAlexUser); return null; }); fileStatus = kerberizedHdfs.getFileSystem().getFileStatus(pathWithAlexUser); assertEquals("alex", fileStatus.getOwner()); + // case4: run by the proxy user again, it will always return the same + // ugi and filesystem instance. + AtomicReference ugi2 = new AtomicReference<>(); + context.runSecured("alex", (Callable) () -> { + ugi2.set(UserGroupInformation.getCurrentUser()); + return null; + }); + assertTrue(ugi1.get() == ugi2.get()); + assertTrue(ugi1.get() == context.getProxyUserUgiPool().get("alex")); + + FileSystem fileSystem1 = context.runSecured("alex", () -> FileSystem.get(kerberizedHdfs.getConf())); + FileSystem fileSystem2 = context.runSecured("alex", () -> FileSystem.get(kerberizedHdfs.getConf())); + assertTrue(fileSystem1 == fileSystem2); } }