Skip to content

Commit

Permalink
ZOOKEEPER-3037: Add JVMPauseMonitor
Browse files Browse the repository at this point in the history
https://issues.apache.org/jira/browse/ZOOKEEPER-3037

Author: Norbert Kalmar <[email protected]>

Reviewers: [email protected]

Closes apache#904 from nkalmar/ZOOKEEPER-3037 and squashes the following commits:

a610532 [Norbert Kalmar] ZOOKEEPER-3037 - add serviceStop() and improve unit tests
7d0baaa [Norbert Kalmar] ZOOKEEPER-3037 - refactor unit tests
97d2c61 [Norbert Kalmar] ZOOKEEPER-3037 - cite hadoop-common as source
3661389 [Norbert Kalmar] ZOOKEEPER-3037 - Add unit test and various improvements
f309757 [Norbert Kalmar] ZOOKEEPER-3037 - Add JvmPauseMonitor

(cherry picked from commit e9adf6e)
  • Loading branch information
nkalmar authored and Mate Szalay-Beko committed Feb 5, 2021
1 parent 29315f8 commit 9fab21a
Show file tree
Hide file tree
Showing 10 changed files with 451 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,15 @@ public class ServerConfig {
/** defaults to -1 if not set explicitly */
protected int maxSessionTimeout = -1;

/** JVM Pause Monitor feature switch */
protected boolean jvmPauseMonitorToRun = false;
/** JVM Pause Monitor warn threshold in ms */
protected long jvmPauseWarnThresholdMs;
/** JVM Pause Monitor info threshold in ms */
protected long jvmPauseInfoThresholdMs;
/** JVM Pause Monitor sleep time in ms */
protected long jvmPauseSleepTimeMs;

/**
* Parse arguments for server configuration
* @param args clientPort dataDir and optional tickTime and maxClientCnxns
Expand Down Expand Up @@ -99,6 +108,10 @@ public void readFrom(QuorumPeerConfig config) {
maxClientCnxns = config.getMaxClientCnxns();
minSessionTimeout = config.getMinSessionTimeout();
maxSessionTimeout = config.getMaxSessionTimeout();
jvmPauseMonitorToRun = config.isJvmPauseMonitorToRun();
jvmPauseInfoThresholdMs = config.getJvmPauseInfoThresholdMs();
jvmPauseWarnThresholdMs = config.getJvmPauseWarnThresholdMs();
jvmPauseSleepTimeMs = config.getJvmPauseSleepTimeMs();
}

public InetSocketAddress getClientPortAddress() {
Expand All @@ -115,4 +128,17 @@ public InetSocketAddress getSecureClientPortAddress() {
public int getMinSessionTimeout() { return minSessionTimeout; }
/** maximum session timeout in milliseconds, -1 if unset */
public int getMaxSessionTimeout() { return maxSessionTimeout; }

public long getJvmPauseInfoThresholdMs() {
return jvmPauseInfoThresholdMs;
}
public long getJvmPauseWarnThresholdMs() {
return jvmPauseWarnThresholdMs;
}
public long getJvmPauseSleepTimeMs() {
return jvmPauseSleepTimeMs;
}
public boolean isJvmPauseMonitorToRun() {
return jvmPauseMonitorToRun;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@
import org.apache.zookeeper.server.quorum.QuorumPeerConfig;
import org.apache.zookeeper.server.quorum.QuorumPeerConfig;
import org.apache.zookeeper.server.quorum.ReadOnlyZooKeeperServer;
import org.apache.zookeeper.server.util.JvmPauseMonitor;
import org.apache.zookeeper.txn.CreateSessionTxn;
import org.apache.zookeeper.txn.TxnHeader;
import org.slf4j.Logger;
Expand Down Expand Up @@ -109,6 +110,7 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
private final AtomicLong hzxid = new AtomicLong(0);
public final static Exception ok = new Exception("No prob");
protected RequestProcessor firstProcessor;
protected JvmPauseMonitor jvmPauseMonitor;
protected volatile State state = State.INITIAL;
protected boolean reconfigEnabled;

Expand Down Expand Up @@ -185,6 +187,20 @@ public ZooKeeperServer(FileTxnSnapLog txnLogFactory, int tickTime,
+ " snapdir " + txnLogFactory.getSnapDir());
}

/**
* Adds JvmPauseMonitor and calls
* {@link #ZooKeeperServer(FileTxnSnapLog, int, int, int, ZKDatabase, boolean)}
*
*/
public ZooKeeperServer(JvmPauseMonitor jvmPauseMonitor, FileTxnSnapLog txnLogFactory, int tickTime,
int minSessionTimeout, int maxSessionTimeout, ZKDatabase zkDb, boolean reconfigEnabled) {
this(txnLogFactory, tickTime, minSessionTimeout, maxSessionTimeout, zkDb, reconfigEnabled);
this.jvmPauseMonitor = jvmPauseMonitor;
if(jvmPauseMonitor != null) {
LOG.info("Added JvmPauseMonitor to server");
}
}

/**
* creates a zookeeperserver instance.
* @param txnLogFactory the file transaction snapshot logging class
Expand Down Expand Up @@ -475,10 +491,18 @@ public synchronized void startup() {

registerJMX();

startJvmPauseMonitor();

setState(State.RUNNING);
notifyAll();
}

protected void startJvmPauseMonitor() {
if (this.jvmPauseMonitor != null) {
this.jvmPauseMonitor.serviceStart();
}
}

protected void setupRequestProcessors() {
RequestProcessor finalProcessor = new FinalRequestProcessor(this);
RequestProcessor syncProcessor = new SyncRequestProcessor(this,
Expand Down Expand Up @@ -583,6 +607,9 @@ public synchronized void shutdown(boolean fullyShutDown) {
if (firstProcessor != null) {
firstProcessor.shutdown();
}
if(jvmPauseMonitor != null) {
jvmPauseMonitor.serviceStop();
}

if (zkDb != null) {
if (fullyShutDown) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.zookeeper.server.persistence.FileTxnSnapLog.DatadirException;
import org.apache.zookeeper.server.quorum.QuorumPeerConfig;
import org.apache.zookeeper.server.quorum.QuorumPeerConfig.ConfigException;
import org.apache.zookeeper.server.util.JvmPauseMonitor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -123,8 +124,13 @@ public void runFromConfig(ServerConfig config)
// run() in this thread.
// create a file logger url from the command line args
txnLog = new FileTxnSnapLog(config.dataLogDir, config.dataDir);
final ZooKeeperServer zkServer = new ZooKeeperServer(txnLog,
config.tickTime, config.minSessionTimeout, config.maxSessionTimeout, null, QuorumPeerConfig.isReconfigEnabled());
JvmPauseMonitor jvmPauseMonitor = null;
if(config.jvmPauseMonitorToRun) {
jvmPauseMonitor = new JvmPauseMonitor(config);
}
final ZooKeeperServer zkServer = new ZooKeeperServer(jvmPauseMonitor, txnLog,
config.tickTime, config.minSessionTimeout, config.maxSessionTimeout,
null, QuorumPeerConfig.isReconfigEnabled());
txnLog.setServerStats(zkServer.serverStats());

// Registers shutdown handler which will be used to know the
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@
import org.apache.zookeeper.server.quorum.flexible.QuorumMaj;
import org.apache.zookeeper.server.quorum.flexible.QuorumVerifier;
import org.apache.zookeeper.server.util.ConfigUtils;
import org.apache.zookeeper.server.util.JvmPauseMonitor;
import org.apache.zookeeper.server.util.ZxidUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -130,6 +131,7 @@ public class QuorumPeer extends ZooKeeperThread implements QuorumStats.Provider
*/
private ZKDatabase zkDb;

private JvmPauseMonitor jvmPauseMonitor;
public static final class AddressTuple {
public final InetSocketAddress quorumAddr;
public final InetSocketAddress electionAddr;
Expand Down Expand Up @@ -450,6 +452,10 @@ public int getQuorumSize(){
return getVotingView().size();
}

public void setJvmPauseMonitor(JvmPauseMonitor jvmPauseMonitor) {
this.jvmPauseMonitor = jvmPauseMonitor;
}

/**
* QuorumVerifier implementation; default (majority).
*/
Expand Down Expand Up @@ -896,6 +902,7 @@ public synchronized void start() {
System.out.println(e);
}
startLeaderElection();
startJvmPauseMonitor();
super.start();
}

Expand Down Expand Up @@ -974,6 +981,12 @@ synchronized public void startLeaderElection() {
this.electionAlg = createElectionAlgorithm(electionType);
}

private void startJvmPauseMonitor() {
if (this.jvmPauseMonitor != null) {
this.jvmPauseMonitor.serviceStart();
}
}

/**
* Count the number of nodes in the map that could be followers.
* @param peers
Expand Down Expand Up @@ -1333,6 +1346,9 @@ public void shutdown() {
if(udpSocket != null) {
udpSocket.close();
}
if(jvmPauseMonitor != null) {
jvmPauseMonitor.serviceStop();
}

try {
adminServer.shutdown();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.zookeeper.common.ClientX509Util;
import org.apache.zookeeper.common.StringUtils;
import org.apache.zookeeper.server.util.JvmPauseMonitor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;
Expand Down Expand Up @@ -117,6 +118,23 @@ public class QuorumPeerConfig {
*/
private final int MIN_SNAP_RETAIN_COUNT = 3;

/**
* JVM Pause Monitor feature switch
*/
protected boolean jvmPauseMonitorToRun = false;
/**
* JVM Pause Monitor warn threshold in ms
*/
protected long jvmPauseWarnThresholdMs = JvmPauseMonitor.WARN_THRESHOLD_DEFAULT;
/**
* JVM Pause Monitor info threshold in ms
*/
protected long jvmPauseInfoThresholdMs = JvmPauseMonitor.INFO_THRESHOLD_DEFAULT;
/**
* JVM Pause Monitor sleep time in ms
*/
protected long jvmPauseSleepTimeMs = JvmPauseMonitor.SLEEP_TIME_MS_DEFAULT;

@SuppressWarnings("serial")
public static class ConfigException extends Exception {
public ConfigException(String msg) {
Expand Down Expand Up @@ -331,6 +349,14 @@ public void parseProperties(Properties zkProp)
quorumServicePrincipal = value;
} else if (key.equals("quorum.cnxn.threads.size")) {
quorumCnxnThreadsSize = Integer.parseInt(value);
} else if (key.equals(JvmPauseMonitor.INFO_THRESHOLD_KEY)) {
jvmPauseInfoThresholdMs = Long.parseLong(value);
} else if (key.equals(JvmPauseMonitor.WARN_THRESHOLD_KEY)) {
jvmPauseWarnThresholdMs = Long.parseLong(value);
} else if (key.equals(JvmPauseMonitor.SLEEP_TIME_MS_KEY)) {
jvmPauseSleepTimeMs = Long.parseLong(value);
} else if (key.equals(JvmPauseMonitor.JVM_PAUSE_MONITOR_FEATURE_SWITCH_KEY)) {
jvmPauseMonitorToRun = Boolean.parseBoolean(value);
} else {
System.setProperty("zookeeper." + key, value);
}
Expand Down Expand Up @@ -790,6 +816,19 @@ public Map<Long,QuorumServer> getServers() {
return Collections.unmodifiableMap(quorumVerifier.getAllMembers());
}

public long getJvmPauseInfoThresholdMs() {
return jvmPauseInfoThresholdMs;
}
public long getJvmPauseWarnThresholdMs() {
return jvmPauseWarnThresholdMs;
}
public long getJvmPauseSleepTimeMs() {
return jvmPauseSleepTimeMs;
}
public boolean isJvmPauseMonitorToRun() {
return jvmPauseMonitorToRun;
}

public long getServerId() { return serverId; }

public boolean isDistributed() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import javax.security.sasl.SaslException;

import org.apache.yetus.audience.InterfaceAudience;
import org.apache.zookeeper.server.util.JvmPauseMonitor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.zookeeper.jmx.ManagedUtil;
Expand Down Expand Up @@ -202,6 +203,10 @@ public void runFromConfig(QuorumPeerConfig config)
quorumPeer.setQuorumCnxnThreadsSize(config.quorumCnxnThreadsSize);
quorumPeer.initialize();

if(config.jvmPauseMonitorToRun) {
quorumPeer.setJvmPauseMonitor(new JvmPauseMonitor(config));
}

quorumPeer.start();
quorumPeer.join();
} catch (InterruptedException e) {
Expand Down
Loading

0 comments on commit 9fab21a

Please sign in to comment.