Skip to content

Commit

Permalink
HBASE-22638 ZooKeeper Utility enhancements
Browse files Browse the repository at this point in the history
Signed-off-by: Jan Hentschel <[email protected]>
  • Loading branch information
virajjasani authored and HorizonNet committed Jul 13, 2019
1 parent 84ee724 commit 74ae25d
Show file tree
Hide file tree
Showing 6 changed files with 78 additions and 71 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -54,21 +54,21 @@ public class MiniZooKeeperCluster {

private static final int TICK_TIME = 2000;
private static final int DEFAULT_CONNECTION_TIMEOUT = 30000;
private int connectionTimeout;
private final int connectionTimeout;

private boolean started;

/** The default port. If zero, we use a random port. */
private int defaultClientPort = 0;

private List<NIOServerCnxnFactory> standaloneServerFactoryList;
private List<ZooKeeperServer> zooKeeperServers;
private List<Integer> clientPortList;
private final List<NIOServerCnxnFactory> standaloneServerFactoryList;
private final List<ZooKeeperServer> zooKeeperServers;
private final List<Integer> clientPortList;

private int activeZKServerIndex;
private int tickTime = 0;

private Configuration configuration;
private final Configuration configuration;

public MiniZooKeeperCluster() {
this(new Configuration());
Expand Down Expand Up @@ -96,6 +96,7 @@ public void addClientPort(int clientPort) {

/**
* Get the list of client ports.
*
* @return clientPortList the client port list
*/
@VisibleForTesting
Expand Down Expand Up @@ -141,18 +142,16 @@ private int selectClientPort(int seedPort) {
}
}
// Make sure that the port is unused.
while (true) {
// break when an unused port is found
do {
for (i = 0; i < clientPortList.size(); i++) {
if (returnClientPort == clientPortList.get(i)) {
// Already used. Update the port and retry.
returnClientPort++;
break;
}
}
if (i == clientPortList.size()) {
break; // found a unused port, exit
}
}
} while (i != clientPortList.size());
return returnClientPort;
}

Expand All @@ -161,7 +160,7 @@ public void setTickTime(int tickTime) {
}

public int getBackupZooKeeperServerNum() {
return zooKeeperServers.size()-1;
return zooKeeperServers.size() - 1;
}

public int getZooKeeperServerNum() {
Expand All @@ -177,7 +176,7 @@ private static void setupTestEnv() {
System.setProperty("zookeeper.preAllocSize", "100");
FileTxnLog.setPreallocSize(100 * 1024);
// allow all 4 letter words
System.setProperty("zookeeper.4lw.commands.whitelist","*");
System.setProperty("zookeeper.4lw.commands.whitelist", "*");
}

public int startup(File baseDir) throws IOException, InterruptedException {
Expand Down Expand Up @@ -210,7 +209,7 @@ public int startup(File baseDir, int numZooKeeperServers) throws IOException,

// running all the ZK servers
for (int i = 0; i < numZooKeeperServers; i++) {
File dir = new File(baseDir, "zookeeper_"+i).getAbsoluteFile();
File dir = new File(baseDir, "zookeeper_" + i).getAbsoluteFile();
createDir(dir);
int tickTimeToUse;
if (this.tickTime > 0) {
Expand Down Expand Up @@ -266,8 +265,7 @@ public int startup(File baseDir, int numZooKeeperServers) throws IOException,
// We have selected a port as a client port. Update clientPortList if necessary.
if (clientPortList.size() <= i) { // it is not in the list, add the port
clientPortList.add(currentClientPort);
}
else if (clientPortList.get(i) <= 0) { // the list has invalid port, update with valid port
} else if (clientPortList.get(i) <= 0) { // the list has invalid port, update with valid port
clientPortList.remove(i);
clientPortList.add(i, currentClientPort);
}
Expand Down Expand Up @@ -403,13 +401,10 @@ private static boolean waitForServerDown(int port, long timeout) throws IOExcept
long start = System.currentTimeMillis();
while (true) {
try {
Socket sock = new Socket("localhost", port);
try {
try (Socket sock = new Socket("localhost", port)) {
OutputStream outstream = sock.getOutputStream();
outstream.write("stat".getBytes());
outstream.flush();
} finally {
sock.close();
}
} catch (IOException e) {
return true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,9 +79,9 @@ public class RecoverableZooKeeper {
// An identifier of this process in the cluster
private final String identifier;
private final byte[] id;
private Watcher watcher;
private int sessionTimeout;
private String quorumServers;
private final Watcher watcher;
private final int sessionTimeout;
private final String quorumServers;

public RecoverableZooKeeper(String quorumServers, int sessionTimeout,
Watcher watcher, int maxRetries, int retryIntervalMillis, int maxSleepTime)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,13 +67,10 @@ private static void resetAcls(final ZKWatcher zkw, final String znode,

private static void resetAcls(final Configuration conf, boolean eraseAcls)
throws Exception {
ZKWatcher zkw = new ZKWatcher(conf, "ZKAclReset", null);
try {
try (ZKWatcher zkw = new ZKWatcher(conf, "ZKAclReset", null)) {
LOG.info((eraseAcls ? "Erase" : "Set") + " HBase ACLs for " +
zkw.getQuorum() + " " + zkw.getZNodePaths().baseZNode);
zkw.getQuorum() + " " + zkw.getZNodePaths().baseZNode);
resetAcls(zkw, zkw.getZNodePaths().baseZNode, eraseAcls);
} finally {
zkw.close();
}
}

Expand All @@ -96,13 +93,20 @@ private void printUsageAndExit() {
public int run(String[] args) throws Exception {
boolean eraseAcls = true;

for (int i = 0; i < args.length; ++i) {
if (args[i].equals("-help")) {
printUsageAndExit();
} else if (args[i].equals("-set-acls")) {
eraseAcls = false;
} else {
printUsageAndExit();
for (String arg : args) {
switch (arg) {
case "-help": {
printUsageAndExit();
break;
}
case "-set-acls": {
eraseAcls = false;
break;
}
default: {
printUsageAndExit();
break;
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,8 @@
*/
@InterfaceAudience.Private
public class ZKClusterId {
private ZKWatcher watcher;
private Abortable abortable;
private final ZKWatcher watcher;
private final Abortable abortable;
private String id;

public ZKClusterId(ZKWatcher watcher, Abortable abortable) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,9 @@ public class ZKLeaderManager extends ZKListener {

private final Object lock = new Object();
private final AtomicBoolean leaderExists = new AtomicBoolean();
private String leaderZNode;
private byte[] nodeId;
private Stoppable candidate;
private final String leaderZNode;
private final byte[] nodeId;
private final Stoppable candidate;

public ZKLeaderManager(ZKWatcher watcher, String leaderZNode,
byte[] identifier, Stoppable candidate) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,7 @@ private static class JaasConfiguration extends javax.security.auth.login.Configu
private static final Map<String, String> BASIC_JAAS_OPTIONS = new HashMap<>();
static {
String jaasEnvVar = System.getenv("HBASE_JAAS_DEBUG");
if (jaasEnvVar != null && "true".equalsIgnoreCase(jaasEnvVar)) {
if ("true".equalsIgnoreCase(jaasEnvVar)) {
BASIC_JAAS_OPTIONS.put("debug", "true");
}
}
Expand Down Expand Up @@ -351,7 +351,7 @@ public static boolean watchAndCheckExists(ZKWatcher zkw, String znode)
throws KeeperException {
try {
Stat s = zkw.getRecoverableZooKeeper().exists(znode, zkw);
boolean exists = s != null ? true : false;
boolean exists = s != null;
if (exists) {
LOG.debug(zkw.prefix("Set watcher on existing znode=" + znode));
} else {
Expand Down Expand Up @@ -441,8 +441,7 @@ public static List<String> listChildrenAndWatchForNewChildren(
ZKWatcher zkw, String znode)
throws KeeperException {
try {
List<String> children = zkw.getRecoverableZooKeeper().getChildren(znode, zkw);
return children;
return zkw.getRecoverableZooKeeper().getChildren(znode, zkw);
} catch(KeeperException.NoNodeException ke) {
LOG.debug(zkw.prefix("Unable to list children of znode " + znode + " " +
"because node does not exist (not an error)"));
Expand Down Expand Up @@ -1744,9 +1743,12 @@ public static String dump(ZKWatcher zkw) {
sb.append("<<FAILED LOOKUP: " + e.getMessage() + ">>");
}
sb.append("\nBackup master addresses:");
for (String child : listChildrenNoWatch(zkw,
zkw.getZNodePaths().backupMasterAddressesZNode)) {
sb.append("\n ").append(child);
final List<String> backupMasterChildrenNoWatchList = listChildrenNoWatch(zkw,
zkw.getZNodePaths().backupMasterAddressesZNode);
if (backupMasterChildrenNoWatchList != null) {
for (String child : backupMasterChildrenNoWatchList) {
sb.append("\n ").append(child);
}
}
sb.append("\nRegion server holding hbase:meta: "
+ new MetaTableLocator().getMetaRegionLocation(zkw));
Expand All @@ -1758,8 +1760,12 @@ public static String dump(ZKWatcher zkw) {
+ new MetaTableLocator().getMetaRegionLocation(zkw, i));
}
sb.append("\nRegion servers:");
for (String child : listChildrenNoWatch(zkw, zkw.getZNodePaths().rsZNode)) {
sb.append("\n ").append(child);
final List<String> rsChildrenNoWatchList =
listChildrenNoWatch(zkw, zkw.getZNodePaths().rsZNode);
if (rsChildrenNoWatchList != null) {
for (String child : rsChildrenNoWatchList) {
sb.append("\n ").append(child);
}
}
try {
getReplicationZnodesDump(zkw, sb);
Expand Down Expand Up @@ -1810,31 +1816,33 @@ private static void getReplicationZnodesDump(ZKWatcher zkw, StringBuilder sb)
// do a ls -r on this znode
sb.append("\n").append(replicationZnode).append(": ");
List<String> children = ZKUtil.listChildrenNoWatch(zkw, replicationZnode);
Collections.sort(children);
for (String child : children) {
String znode = ZNodePaths.joinZNode(replicationZnode, child);
if (znode.equals(zkw.getZNodePaths().peersZNode)) {
appendPeersZnodes(zkw, znode, sb);
} else if (znode.equals(zkw.getZNodePaths().queuesZNode)) {
appendRSZnodes(zkw, znode, sb);
} else if (znode.equals(zkw.getZNodePaths().hfileRefsZNode)) {
appendHFileRefsZnodes(zkw, znode, sb);
if (children != null) {
Collections.sort(children);
for (String child : children) {
String zNode = ZNodePaths.joinZNode(replicationZnode, child);
if (zNode.equals(zkw.getZNodePaths().peersZNode)) {
appendPeersZnodes(zkw, zNode, sb);
} else if (zNode.equals(zkw.getZNodePaths().queuesZNode)) {
appendRSZnodes(zkw, zNode, sb);
} else if (zNode.equals(zkw.getZNodePaths().hfileRefsZNode)) {
appendHFileRefsZNodes(zkw, zNode, sb);
}
}
}
}

private static void appendHFileRefsZnodes(ZKWatcher zkw, String hfileRefsZnode,
private static void appendHFileRefsZNodes(ZKWatcher zkw, String hFileRefsZNode,
StringBuilder sb) throws KeeperException {
sb.append("\n").append(hfileRefsZnode).append(": ");
for (String peerIdZnode : ZKUtil.listChildrenNoWatch(zkw, hfileRefsZnode)) {
String znodeToProcess = ZNodePaths.joinZNode(hfileRefsZnode, peerIdZnode);
sb.append("\n").append(znodeToProcess).append(": ");
List<String> peerHFileRefsZnodes = ZKUtil.listChildrenNoWatch(zkw, znodeToProcess);
int size = peerHFileRefsZnodes.size();
for (int i = 0; i < size; i++) {
sb.append(peerHFileRefsZnodes.get(i));
if (i != size - 1) {
sb.append(", ");
sb.append("\n").append(hFileRefsZNode).append(": ");
final List<String> hFileRefChildrenNoWatchList =
ZKUtil.listChildrenNoWatch(zkw, hFileRefsZNode);
if (hFileRefChildrenNoWatchList != null) {
for (String peerIdZNode : hFileRefChildrenNoWatchList) {
String zNodeToProcess = ZNodePaths.joinZNode(hFileRefsZNode, peerIdZNode);
sb.append("\n").append(zNodeToProcess).append(": ");
List<String> peerHFileRefsZNodes = ZKUtil.listChildrenNoWatch(zkw, zNodeToProcess);
if (peerHFileRefsZNodes != null) {
sb.append(String.join(", ", peerHFileRefsZNodes));
}
}
}
Expand Down Expand Up @@ -1946,10 +1954,10 @@ private static void appendPeerState(ZKWatcher zkw, String znodeToProcess, String
* @return The array of response strings.
* @throws IOException When the socket communication fails.
*/
public static String[] getServerStats(String server, int timeout)
private static String[] getServerStats(String server, int timeout)
throws IOException {
String[] sp = server.split(":");
if (sp == null || sp.length == 0) {
if (sp.length == 0) {
return null;
}

Expand Down Expand Up @@ -2085,7 +2093,7 @@ public static void logZKTree(ZKWatcher zkw, String root) {
* @see #logZKTree(ZKWatcher, String)
* @throws KeeperException if an unexpected exception occurs
*/
protected static void logZKTree(ZKWatcher zkw, String root, String prefix)
private static void logZKTree(ZKWatcher zkw, String root, String prefix)
throws KeeperException {
List<String> children = ZKUtil.listChildrenNoWatch(zkw, root);

Expand Down

0 comments on commit 74ae25d

Please sign in to comment.