Skip to content

Commit

Permalink
refactor5
Browse files Browse the repository at this point in the history
  • Loading branch information
wsjz committed Jul 11, 2024
1 parent f65435a commit 5606297
Show file tree
Hide file tree
Showing 13 changed files with 152 additions and 55 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2903,6 +2903,9 @@ public static int metaServiceRpcRetryTimes() {
@ConfField(mutable = true)
public static boolean enable_cooldown_replica_affinity = true;

@ConfField(description = {"Kerberos TGT ticket缓存更新周期",
"Kerberos TGT ticket cache update period"})
public static long kerberos_tgt_cache_renew_time = 3600 * 1000L;
//==========================================================================
// end of cloud config
//==========================================================================
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,6 @@ public interface HadoopAuthenticator {

UserGroupInformation getUGI() throws IOException;

UserGroupInformation refreshUGI() throws IOException;

<T> T ugiDoAs(PrivilegedExceptionAction<T> action) throws Exception;

default void ugiDoAs(Runnable runnable) throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.security.UserGroupInformation;
Expand All @@ -44,7 +43,6 @@ public class HadoopKerberosAuthenticator implements HadoopAuthenticator {
public HadoopKerberosAuthenticator(KerberosAuthenticationConfig config) {
this.config = config;
try {
// TODO: add cache and add refresh
ugi = getUGI();
} catch (IOException e) {
throw new RuntimeException(e);
Expand All @@ -54,11 +52,15 @@ public HadoopKerberosAuthenticator(KerberosAuthenticationConfig config) {
public static void initializeAuthConfig(Configuration hadoopConf) {
hadoopConf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION, "true");
hadoopConf.set(CommonConfigurationKeysPublic.HADOOP_KERBEROS_KEYTAB_LOGIN_AUTORENEWAL_ENABLED, "true");
UserGroupInformation.setConfiguration(hadoopConf);
synchronized (HadoopKerberosAuthenticator.class) {
// avoid other catalog set conf at the same time
UserGroupInformation.setConfiguration(hadoopConf);
}
}

@Override
public UserGroupInformation getUGI() throws IOException {
// login and get ugi when catalog is initialized
initializeAuthConfig(config.getConf());
String principal = config.getKerberosPrincipal();
Subject subject = getSubject(config.getKerberosKeytab(), principal);
Expand Down Expand Up @@ -101,14 +103,6 @@ public AppConfigurationEntry[] getAppConfigurationEntry(String name) {
};
}

@Override
public UserGroupInformation refreshUGI() throws IOException {
if (ugi.hasKerberosCredentials() && StringUtils.equals(ugi.getUserName(), config.getKerberosPrincipal())) {
ugi.checkTGTAndReloginFromKeytab();
}
return ugi;
}

@Override
public <T> T ugiDoAs(PrivilegedExceptionAction<T> action) throws Exception {
return ugi.doAs(action);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ public UserGroupInformation getUGI() {
LOG.debug(AuthenticationConfig.HADOOP_USER_NAME + " is unset, use default user: hadoop");
}
try {
// get login user just for simple auth
ugi = UserGroupInformation.getLoginUser();
if (ugi.getUserName().equals(hadoopUserName)) {
return ugi;
Expand All @@ -55,12 +56,6 @@ public UserGroupInformation getUGI() {
return ugi;
}

@Override
public UserGroupInformation refreshUGI() throws IOException {
ugi = getUGI();
return ugi;
}

@Override
public <T> T ugiDoAs(PrivilegedExceptionAction<T> action) throws Exception {
if (ugi != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,17 +32,31 @@ public static HadoopAuthenticator getHadoopAuthenticator(AuthenticationConfig co
}
}

public static <T> T ugiDoAs(UserGroupInformation ugi, PrivilegedExceptionAction<T> action) {
try {
if (ugi != null) {
return ugi.doAs(action);
} else {
return action.run();
}
} catch (Exception e) {
throw new RuntimeException(e.getMessage(), e);
}
}

/**
* login and return hadoop ugi
* @param config auth config
* @return ugi
*/
@Deprecated
private static UserGroupInformation loginWithUGI(AuthenticationConfig config) {
if (config == null || !config.isValid()) {
return null;
}
if (config instanceof KerberosAuthenticationConfig) {
try {
// remove after iceberg and hudi kerberos test case pass
return new HadoopKerberosAuthenticator((KerberosAuthenticationConfig) config).getUGI();
} catch (IOException e) {
throw new RuntimeException(e);
Expand All @@ -52,6 +66,7 @@ private static UserGroupInformation loginWithUGI(AuthenticationConfig config) {
}
}

@Deprecated
public static <T> T ugiDoAs(AuthenticationConfig authConf, PrivilegedExceptionAction<T> action) {
UserGroupInformation ugi = HadoopUGI.loginWithUGI(authConf);
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,14 +35,10 @@ public ImpersonatingHadoopAuthenticator(HadoopAuthenticator delegate, String use
}

@Override
public synchronized UserGroupInformation getUGI() throws IOException {
ugi = UserGroupInformation.createProxyUser(username, getUGI());
return ugi;
}

@Override
public UserGroupInformation refreshUGI() throws IOException {
ugi = delegate.refreshUGI();
public UserGroupInformation getUGI() throws IOException {
synchronized (ImpersonatingHadoopAuthenticator.class) {
ugi = UserGroupInformation.createProxyUser(username, delegate.getUGI());
}
return ugi;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.doris.datasource.hive;

import org.apache.doris.analysis.TableName;
import org.apache.doris.common.security.authentication.HadoopAuthenticator;
import org.apache.doris.datasource.DatabaseMetadata;
import org.apache.doris.datasource.TableMetadata;
import org.apache.doris.datasource.hive.event.MetastoreNotificationFetchException;
Expand Down Expand Up @@ -113,6 +114,8 @@ void updatePartitionStatistics(

void dropPartition(String dbName, String tableName, List<String> partitionValues, boolean deleteData);

void setHadoopAuthenticator(HadoopAuthenticator hadoopAuthenticator);

/**
* close the connection, eg, to hms
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,15 @@
import org.apache.doris.common.UserException;
import org.apache.doris.common.security.authentication.AuthenticationConfig;
import org.apache.doris.common.security.authentication.HadoopAuthenticator;
import org.apache.doris.common.security.authentication.HadoopUGI;
import org.apache.doris.common.security.authentication.KerberosAuthenticationConfig;
import org.apache.doris.common.security.authentication.SimpleAuthenticationConfig;
import org.apache.doris.common.util.CacheBulkLoader;
import org.apache.doris.common.util.LocationPath;
import org.apache.doris.datasource.CacheException;
import org.apache.doris.datasource.FileSplit;
import org.apache.doris.datasource.hive.AcidInfo.DeleteDeltaInfo;
import org.apache.doris.datasource.hive.security.CachingKerberosAuthenticator;
import org.apache.doris.datasource.property.PropertyConverter;
import org.apache.doris.fs.FileSystemCache;
import org.apache.doris.fs.remote.RemoteFile;
Expand Down Expand Up @@ -120,7 +124,7 @@ public class HiveMetaStoreCache {
// Other thread may reset this cache, so use AtomicReference to wrap it.
private volatile AtomicReference<LoadingCache<FileCacheKey, FileCacheValue>> fileCacheRef
= new AtomicReference<>();
private LoadingCache<AuthenticatorCacheKey, HadoopAuthenticator> authenticatorCache;
private CachingKerberosAuthenticator authenticator;

public HiveMetaStoreCache(HMSExternalCatalog catalog,
ExecutorService refreshExecutor, ExecutorService fileListingExecutor) {
Expand Down Expand Up @@ -165,7 +169,7 @@ public Map<PartitionCacheKey, HivePartition> loadAll(Iterable<? extends Partitio
}, null, refreshExecutor);

setNewFileCache();
setAuthenticatorCache();
initAuthenticator();
}

/***
Expand Down Expand Up @@ -207,14 +211,21 @@ public FileCacheValue load(FileCacheKey key) {
}
}

private void setAuthenticatorCache() {
private void initAuthenticator() {
CacheFactory authCacheFactory = new CacheFactory(
OptionalLong.of(28800L),
OptionalLong.of(10 * 60L),
100,
false,
null);
authenticatorCache = authCacheFactory.buildCache(key -> refreshUgi, refreshExecutor);
OptionalLong.of(Config.kerberos_tgt_cache_renew_time * 24),
OptionalLong.of(Config.kerberos_tgt_cache_renew_time),
100,
false,
null);
AuthenticationConfig config = AuthenticationConfig.getKerberosConfig(catalog.getConfiguration());
// Now just support ugi as auth cache value.
LoadingCache<AuthenticatorCacheKey, UserGroupInformation> authenticatorCache =
authCacheFactory.buildCache(key -> {
HadoopAuthenticator auth = HadoopUGI.getHadoopAuthenticator(config);
return auth.getUGI();
}, null, this.refreshExecutor);
authenticator = new CachingKerberosAuthenticator(config, authenticatorCache);
}

private void initMetrics() {
Expand Down Expand Up @@ -599,11 +610,10 @@ public void invalidateTableCache(String dbName, String tblName) {
}
}

public HadoopAuthenticator getAuthenticatorCache() {
return partitionValuesCache.get(key);
public HadoopAuthenticator getCachingKerberosAuthenticator() {
return authenticator;
}


public void invalidatePartitionCache(String dbName, String tblName, String partitionName) {
PartitionValueCacheKey key = new PartitionValueCacheKey(dbName, tblName, null);
HivePartitionValues partitionValues = partitionValuesCache.getIfPresent(key);
Expand Down Expand Up @@ -1091,7 +1101,40 @@ private static boolean isGeneratedPath(String name) {

@Data
public static class AuthenticatorCacheKey {
private String credentialName;

public AuthenticatorCacheKey(String credentialName) {
this.credentialName = credentialName;
}

public static AuthenticatorCacheKey createHadoopAuthKey(AuthenticationConfig config) {
if (config instanceof KerberosAuthenticationConfig) {
return new AuthenticatorCacheKey(((KerberosAuthenticationConfig) config).getKerberosPrincipal());
} else {
return new AuthenticatorCacheKey(((SimpleAuthenticationConfig) config).getUsername());
}
}

@Override
public boolean equals(Object obj) {
if (this == obj) {
return true;
}
if (!(obj instanceof AuthenticatorCacheKey)) {
return false;
}
return credentialName.equals(((AuthenticatorCacheKey) obj).credentialName);
}

@Override
public int hashCode() {
return Objects.hash(credentialName);
}

@Override
public String toString() {
return "AuthenticatorCacheKey{" + "credentialName=" + credentialName + '}';
}
}

@Data
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,7 @@
import org.apache.doris.common.ErrorCode;
import org.apache.doris.common.ErrorReport;
import org.apache.doris.common.UserException;
import org.apache.doris.common.security.authentication.AuthenticationConfig;
import org.apache.doris.common.security.authentication.HadoopAuthenticator;
import org.apache.doris.common.security.authentication.HadoopUGI;
import org.apache.doris.datasource.ExternalDatabase;
import org.apache.doris.datasource.jdbc.client.JdbcClient;
import org.apache.doris.datasource.jdbc.client.JdbcClientConfig;
Expand Down Expand Up @@ -63,11 +61,15 @@ public class HiveMetadataOps implements ExternalMetadataOps {
private static final int MIN_CLIENT_POOL_SIZE = 8;
private final HMSCachedClient client;
private final HMSExternalCatalog catalog;
private HadoopAuthenticator hadoopAuthenticator;

public HiveMetadataOps(HiveConf hiveConf, JdbcClientConfig jdbcClientConfig, HMSExternalCatalog catalog) {
this(catalog, createCachedClient(hiveConf,
Math.max(MIN_CLIENT_POOL_SIZE, Config.max_external_cache_loader_thread_pool_size),
jdbcClientConfig));
hadoopAuthenticator = Env.getCurrentEnv().getExtMetaCacheMgr().getMetaStoreCache(catalog)
.getCachingKerberosAuthenticator();
client.setHadoopAuthenticator(hadoopAuthenticator);
}

@VisibleForTesting
Expand All @@ -76,13 +78,6 @@ public HiveMetadataOps(HMSExternalCatalog catalog, HMSCachedClient client) {
this.client = client;
}

public Optional<HadoopAuthenticator> getHadoopAuthenticator() {
if (client instanceof ThriftHMSCachedClient) {
return Optional.of(((ThriftHMSCachedClient) client).getHadoopAuthenticator());
}
return Optional.empty();
}

public HMSCachedClient getClient() {
return client;
}
Expand All @@ -95,9 +90,6 @@ private static HMSCachedClient createCachedClient(HiveConf hiveConf, int thriftC
JdbcClientConfig jdbcClientConfig) {
if (hiveConf != null) {
ThriftHMSCachedClient client = new ThriftHMSCachedClient(hiveConf, thriftClientPoolSize);
HadoopAuthenticator hadoopAuthenticator = HadoopUGI.getHadoopAuthenticator(
AuthenticationConfig.getKerberosConfig(hiveConf));
client.setHadoopAuthenticator(hadoopAuthenticator);
return client;
}
Preconditions.checkNotNull(jdbcClientConfig, "hiveConf and jdbcClientConfig are both null");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.apache.doris.analysis.TableName;
import org.apache.doris.catalog.JdbcTable;
import org.apache.doris.catalog.Type;
import org.apache.doris.common.security.authentication.HadoopAuthenticator;
import org.apache.doris.datasource.DatabaseMetadata;
import org.apache.doris.datasource.TableMetadata;
import org.apache.doris.datasource.hive.event.MetastoreNotificationFetchException;
Expand Down Expand Up @@ -587,4 +588,9 @@ public void dropTable(String dbName, String tblName) {
public String getCatalogLocation(String catalogName) {
throw new HMSClientException("Do not support in PostgreSQLJdbcHMSCachedClient.");
}

@Override
public void setHadoopAuthenticator(HadoopAuthenticator hadoopAuthenticator) {
throw new HMSClientException("Do not support in PostgreSQLJdbcHMSCachedClient.");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -106,10 +106,6 @@ public ThriftHMSCachedClient(HiveConf hiveConf, int poolSize) {
this.isClosed = false;
}

public HadoopAuthenticator getHadoopAuthenticator() {
return hadoopAuthenticator;
}

public void setHadoopAuthenticator(HadoopAuthenticator hadoopAuthenticator) {
this.hadoopAuthenticator = hadoopAuthenticator;
}
Expand Down
Loading

0 comments on commit 5606297

Please sign in to comment.