Skip to content

Commit

Permalink
HDFS-15383. RBF: Add support for router delegation token without watch (
Browse files Browse the repository at this point in the history
apache#2047)

Improving router's performance for delegation tokens related operations. It achieves the goal by removing watchers from router on tokens since based on our experience. The huge number of watches inside Zookeeper is degrading Zookeeper's performance pretty hard. The current limit is about 1.2-1.5 million.
  • Loading branch information
fengnanli authored Jun 23, 2020
1 parent 03f855e commit 84110d8
Show file tree
Hide file tree
Showing 5 changed files with 498 additions and 69 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,11 @@
import java.io.IOException;
import java.security.MessageDigest;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;

import javax.crypto.SecretKey;

Expand Down Expand Up @@ -63,7 +63,7 @@ private String formatTokenId(TokenIdent id) {
* to DelegationTokenInformation. Protected by this object lock.
*/
protected final Map<TokenIdent, DelegationTokenInformation> currentTokens
= new HashMap<TokenIdent, DelegationTokenInformation>();
= new ConcurrentHashMap<>();

/**
* Sequence number to create DelegationTokenIdentifier.
Expand All @@ -75,7 +75,7 @@ private String formatTokenId(TokenIdent id) {
* Access to allKeys is protected by this object lock
*/
protected final Map<Integer, DelegationKey> allKeys
= new HashMap<Integer, DelegationKey>();
= new ConcurrentHashMap<>();

/**
* Access to currentId is protected by this object lock.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.delegation.web.DelegationTokenManager;
import static org.apache.hadoop.util.Time.now;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.KeeperException.NoNodeException;
Expand All @@ -79,7 +80,7 @@
public abstract class ZKDelegationTokenSecretManager<TokenIdent extends AbstractDelegationTokenIdentifier>
extends AbstractDelegationTokenSecretManager<TokenIdent> {

private static final String ZK_CONF_PREFIX = "zk-dt-secret-manager.";
public static final String ZK_CONF_PREFIX = "zk-dt-secret-manager.";
public static final String ZK_DTSM_ZK_NUM_RETRIES = ZK_CONF_PREFIX
+ "zkNumRetries";
public static final String ZK_DTSM_ZK_SESSION_TIMEOUT = ZK_CONF_PREFIX
Expand All @@ -100,6 +101,9 @@ public abstract class ZKDelegationTokenSecretManager<TokenIdent extends Abstract
+ "kerberos.principal";
public static final String ZK_DTSM_TOKEN_SEQNUM_BATCH_SIZE = ZK_CONF_PREFIX
+ "token.seqnum.batch.size";
public static final String ZK_DTSM_TOKEN_WATCHER_ENABLED = ZK_CONF_PREFIX
+ "token.watcher.enabled";
public static final boolean ZK_DTSM_TOKEN_WATCHER_ENABLED_DEFAULT = true;

public static final int ZK_DTSM_ZK_NUM_RETRIES_DEFAULT = 3;
public static final int ZK_DTSM_ZK_SESSION_TIMEOUT_DEFAULT = 10000;
Expand All @@ -118,7 +122,7 @@ public abstract class ZKDelegationTokenSecretManager<TokenIdent extends Abstract
private static final String ZK_DTSM_NAMESPACE = "ZKDTSMRoot";
private static final String ZK_DTSM_SEQNUM_ROOT = "/ZKDTSMSeqNumRoot";
private static final String ZK_DTSM_KEYID_ROOT = "/ZKDTSMKeyIdRoot";
private static final String ZK_DTSM_TOKENS_ROOT = "/ZKDTSMTokensRoot";
protected static final String ZK_DTSM_TOKENS_ROOT = "/ZKDTSMTokensRoot";
private static final String ZK_DTSM_MASTER_KEY_ROOT = "/ZKDTSMMasterKeyRoot";

private static final String DELEGATION_KEY_PREFIX = "DK_";
Expand All @@ -132,7 +136,7 @@ public static void setCurator(CuratorFramework curator) {
}

private final boolean isExternalClient;
private final CuratorFramework zkClient;
protected final CuratorFramework zkClient;
private SharedCount delTokSeqCounter;
private SharedCount keyIdSeqCounter;
private PathChildrenCache keyCache;
Expand All @@ -143,6 +147,8 @@ public static void setCurator(CuratorFramework curator) {
private int currentSeqNum;
private int currentMaxSeqNum;

private final boolean isTokenWatcherEnabled;

public ZKDelegationTokenSecretManager(Configuration conf) {
super(conf.getLong(DelegationTokenManager.UPDATE_INTERVAL,
DelegationTokenManager.UPDATE_INTERVAL_DEFAULT) * 1000,
Expand All @@ -156,6 +162,8 @@ public ZKDelegationTokenSecretManager(Configuration conf) {
ZK_DTSM_ZK_SHUTDOWN_TIMEOUT_DEFAULT);
seqNumBatchSize = conf.getInt(ZK_DTSM_TOKEN_SEQNUM_BATCH_SIZE,
ZK_DTSM_TOKEN_SEQNUM_BATCH_SIZE_DEFAULT);
isTokenWatcherEnabled = conf.getBoolean(ZK_DTSM_TOKEN_WATCHER_ENABLED,
ZK_DTSM_TOKEN_WATCHER_ENABLED_DEFAULT);
if (CURATOR_TL.get() != null) {
zkClient =
CURATOR_TL.get().usingNamespace(
Expand Down Expand Up @@ -383,34 +391,37 @@ public void childEvent(CuratorFramework client,
} catch (Exception e) {
throw new IOException("Could not start PathChildrenCache for keys", e);
}
try {
tokenCache = new PathChildrenCache(zkClient, ZK_DTSM_TOKENS_ROOT, true);
if (tokenCache != null) {
tokenCache.start(StartMode.BUILD_INITIAL_CACHE);
tokenCache.getListenable().addListener(new PathChildrenCacheListener() {

@Override
public void childEvent(CuratorFramework client,
PathChildrenCacheEvent event) throws Exception {
switch (event.getType()) {
case CHILD_ADDED:
processTokenAddOrUpdate(event.getData());
break;
case CHILD_UPDATED:
processTokenAddOrUpdate(event.getData());
break;
case CHILD_REMOVED:
processTokenRemoved(event.getData());
break;
default:
break;
if (isTokenWatcherEnabled) {
LOG.info("TokenCache is enabled");
try {
tokenCache = new PathChildrenCache(zkClient, ZK_DTSM_TOKENS_ROOT, true);
if (tokenCache != null) {
tokenCache.start(StartMode.BUILD_INITIAL_CACHE);
tokenCache.getListenable().addListener(new PathChildrenCacheListener() {

@Override
public void childEvent(CuratorFramework client,
PathChildrenCacheEvent event) throws Exception {
switch (event.getType()) {
case CHILD_ADDED:
processTokenAddOrUpdate(event.getData().getData());
break;
case CHILD_UPDATED:
processTokenAddOrUpdate(event.getData().getData());
break;
case CHILD_REMOVED:
processTokenRemoved(event.getData());
break;
default:
break;
}
}
}
}, listenerThreadPool);
loadFromZKCache(true);
}, listenerThreadPool);
loadFromZKCache(true);
}
} catch (Exception e) {
throw new IOException("Could not start PathChildrenCache for tokens", e);
}
} catch (Exception e) {
throw new IOException("Could not start PathChildrenCache for tokens", e);
}
super.startThreads();
}
Expand All @@ -435,7 +446,7 @@ private void loadFromZKCache(final boolean isTokenCache) {
for (ChildData child : children) {
try {
if (isTokenCache) {
processTokenAddOrUpdate(child);
processTokenAddOrUpdate(child.getData());
} else {
processKeyAddOrUpdate(child.getData());
}
Expand All @@ -457,9 +468,7 @@ private void processKeyAddOrUpdate(byte[] data) throws IOException {
DataInputStream din = new DataInputStream(bin);
DelegationKey key = new DelegationKey();
key.readFields(din);
synchronized (this) {
allKeys.put(key.getKeyId(), key);
}
allKeys.put(key.getKeyId(), key);
}

private void processKeyRemoved(String path) {
Expand All @@ -469,15 +478,13 @@ private void processKeyRemoved(String path) {
int j = tokSeg.indexOf('_');
if (j > 0) {
int keyId = Integer.parseInt(tokSeg.substring(j + 1));
synchronized (this) {
allKeys.remove(keyId);
}
allKeys.remove(keyId);
}
}
}

private void processTokenAddOrUpdate(ChildData data) throws IOException {
ByteArrayInputStream bin = new ByteArrayInputStream(data.getData());
protected TokenIdent processTokenAddOrUpdate(byte[] data) throws IOException {
ByteArrayInputStream bin = new ByteArrayInputStream(data);
DataInputStream din = new DataInputStream(bin);
TokenIdent ident = createIdentifier();
ident.readFields(din);
Expand All @@ -488,24 +495,18 @@ private void processTokenAddOrUpdate(ChildData data) throws IOException {
if (numRead > -1) {
DelegationTokenInformation tokenInfo =
new DelegationTokenInformation(renewDate, password);
synchronized (this) {
currentTokens.put(ident, tokenInfo);
// The cancel task might be waiting
notifyAll();
}
currentTokens.put(ident, tokenInfo);
return ident;
}
return null;
}

private void processTokenRemoved(ChildData data) throws IOException {
ByteArrayInputStream bin = new ByteArrayInputStream(data.getData());
DataInputStream din = new DataInputStream(bin);
TokenIdent ident = createIdentifier();
ident.readFields(din);
synchronized (this) {
currentTokens.remove(ident);
// The cancel task might be waiting
notifyAll();
}
currentTokens.remove(ident);
}

@Override
Expand Down Expand Up @@ -706,7 +707,7 @@ protected DelegationTokenInformation getTokenInfo(TokenIdent ident) {
*
* @param ident Identifier of the token
*/
private synchronized void syncLocalCacheWithZk(TokenIdent ident) {
protected void syncLocalCacheWithZk(TokenIdent ident) {
try {
DelegationTokenInformation tokenInfo = getTokenInfoFromZK(ident);
if (tokenInfo != null && !currentTokens.containsKey(ident)) {
Expand All @@ -720,16 +721,21 @@ private synchronized void syncLocalCacheWithZk(TokenIdent ident) {
}
}

private DelegationTokenInformation getTokenInfoFromZK(TokenIdent ident)
protected DelegationTokenInformation getTokenInfoFromZK(TokenIdent ident)
throws IOException {
return getTokenInfoFromZK(ident, false);
}

private DelegationTokenInformation getTokenInfoFromZK(TokenIdent ident,
protected DelegationTokenInformation getTokenInfoFromZK(TokenIdent ident,
boolean quiet) throws IOException {
String nodePath =
getNodePath(ZK_DTSM_TOKENS_ROOT,
DELEGATION_TOKEN_PREFIX + ident.getSequenceNumber());
return getTokenInfoFromZK(nodePath, quiet);
}

protected DelegationTokenInformation getTokenInfoFromZK(String nodePath,
boolean quiet) throws IOException {
try {
byte[] data = zkClient.getData().forPath(nodePath);
if ((data == null) || (data.length == 0)) {
Expand Down Expand Up @@ -864,15 +870,30 @@ protected void updateToken(TokenIdent ident,
@Override
protected void removeStoredToken(TokenIdent ident)
throws IOException {
removeStoredToken(ident, false);
}

protected void removeStoredToken(TokenIdent ident,
boolean checkAgainstZkBeforeDeletion) throws IOException {
String nodeRemovePath =
getNodePath(ZK_DTSM_TOKENS_ROOT, DELEGATION_TOKEN_PREFIX
+ ident.getSequenceNumber());
if (LOG.isDebugEnabled()) {
LOG.debug("Removing ZKDTSMDelegationToken_"
+ ident.getSequenceNumber());
}
try {
if (zkClient.checkExists().forPath(nodeRemovePath) != null) {
DelegationTokenInformation dtInfo = getTokenInfoFromZK(ident, true);
if (dtInfo != null) {
// For the case there is no sync or watch miss, it is possible that the
// local storage has expired tokens which have been renewed by peer
// so double check again to avoid accidental delete
if (checkAgainstZkBeforeDeletion
&& dtInfo.getRenewDate() > now()) {
LOG.info("Node already renewed by peer " + nodeRemovePath +
" so this token should not be deleted");
return;
}
if (LOG.isDebugEnabled()) {
LOG.debug("Removing ZKDTSMDelegationToken_"
+ ident.getSequenceNumber());
}
while(zkClient.checkExists().forPath(nodeRemovePath) != null){
try {
zkClient.delete().guaranteed().forPath(nodeRemovePath);
Expand All @@ -895,7 +916,7 @@ protected void removeStoredToken(TokenIdent ident)
}

@Override
public synchronized TokenIdent cancelToken(Token<TokenIdent> token,
public TokenIdent cancelToken(Token<TokenIdent> token,
String canceller) throws IOException {
ByteArrayInputStream buf = new ByteArrayInputStream(token.getIdentifier());
DataInputStream in = new DataInputStream(buf);
Expand All @@ -906,7 +927,7 @@ public synchronized TokenIdent cancelToken(Token<TokenIdent> token,
return super.cancelToken(token, canceller);
}

private void addOrUpdateToken(TokenIdent ident,
protected void addOrUpdateToken(TokenIdent ident,
DelegationTokenInformation info, boolean isUpdate) throws Exception {
String nodeCreatePath =
getNodePath(ZK_DTSM_TOKENS_ROOT, DELEGATION_TOKEN_PREFIX
Expand All @@ -933,6 +954,10 @@ private void addOrUpdateToken(TokenIdent ident,
}
}

public boolean isTokenWatcherEnabled() {
return isTokenWatcherEnabled;
}

/**
* Simple implementation of an {@link ACLProvider} that simply returns an ACL
* that gives all permissions only to a single principal.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,15 +59,15 @@ public class TestZKDelegationTokenSecretManager {
private static final Logger LOG =
LoggerFactory.getLogger(TestZKDelegationTokenSecretManager.class);

private static final int TEST_RETRIES = 2;
protected static final int TEST_RETRIES = 2;

private static final int RETRY_COUNT = 5;
protected static final int RETRY_COUNT = 5;

private static final int RETRY_WAIT = 1000;
protected static final int RETRY_WAIT = 1000;

private static final long DAY_IN_SECS = 86400;
protected static final long DAY_IN_SECS = 86400;

private TestingServer zkServer;
protected TestingServer zkServer;

@Rule
public Timeout globalTimeout = new Timeout(300000);
Expand Down Expand Up @@ -425,7 +425,7 @@ private void verifyACL(CuratorFramework curatorFramework,
// cancelled but.. that would mean having to make an RPC call for every
// verification request.
// Thus, the eventual consistency tradef-off should be acceptable here...
private void verifyTokenFail(DelegationTokenManager tm,
protected void verifyTokenFail(DelegationTokenManager tm,
Token<DelegationTokenIdentifier> token) throws IOException,
InterruptedException {
verifyTokenFailWithRetry(tm, token, RETRY_COUNT);
Expand Down
Loading

0 comments on commit 84110d8

Please sign in to comment.