Skip to content

Commit

Permalink
ZOOKEEPER-3037: Add JVMPauseMonitor
Browse files Browse the repository at this point in the history
https://issues.apache.org/jira/browse/ZOOKEEPER-3037

Author: Norbert Kalmar <[email protected]>

Reviewers: [email protected]

Closes apache#904 from nkalmar/ZOOKEEPER-3037 and squashes the following commits:

a610532 [Norbert Kalmar] ZOOKEEPER-3037 - add serviceStop() and improve unit tests
7d0baaa [Norbert Kalmar] ZOOKEEPER-3037 - refactor unit tests
97d2c61 [Norbert Kalmar] ZOOKEEPER-3037 - cite hadoop-common as source
3661389 [Norbert Kalmar] ZOOKEEPER-3037 - Add unit test and various improvements
f309757 [Norbert Kalmar] ZOOKEEPER-3037 - Add JvmPauseMonitor

(cherry picked from commit e9adf6e)
Change-Id: I7654fc85168bf163bdbbe96ebf73cb9ccf265e81
  • Loading branch information
nkalmar authored and Mate Szalay-Beko committed Feb 9, 2021
1 parent 742c487 commit e61f059
Show file tree
Hide file tree
Showing 10 changed files with 454 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,15 @@ public class ServerConfig {
/** defaults to -1 if not set explicitly */
protected int maxSessionTimeout = -1;

/** JVM Pause Monitor feature switch */
protected boolean jvmPauseMonitorToRun = false;
/** JVM Pause Monitor warn threshold in ms */
protected long jvmPauseWarnThresholdMs;
/** JVM Pause Monitor info threshold in ms */
protected long jvmPauseInfoThresholdMs;
/** JVM Pause Monitor sleep time in ms */
protected long jvmPauseSleepTimeMs;

/**
* Parse arguments for server configuration
* @param args clientPort dataDir and optional tickTime and maxClientCnxns
Expand Down Expand Up @@ -99,6 +108,10 @@ public void readFrom(QuorumPeerConfig config) {
maxClientCnxns = config.getMaxClientCnxns();
minSessionTimeout = config.getMinSessionTimeout();
maxSessionTimeout = config.getMaxSessionTimeout();
jvmPauseMonitorToRun = config.isJvmPauseMonitorToRun();
jvmPauseInfoThresholdMs = config.getJvmPauseInfoThresholdMs();
jvmPauseWarnThresholdMs = config.getJvmPauseWarnThresholdMs();
jvmPauseSleepTimeMs = config.getJvmPauseSleepTimeMs();
}

public InetSocketAddress getClientPortAddress() {
Expand All @@ -115,4 +128,17 @@ public InetSocketAddress getSecureClientPortAddress() {
public int getMinSessionTimeout() { return minSessionTimeout; }
/** maximum session timeout in milliseconds, -1 if unset */
public int getMaxSessionTimeout() { return maxSessionTimeout; }

public long getJvmPauseInfoThresholdMs() {
return jvmPauseInfoThresholdMs;
}
public long getJvmPauseWarnThresholdMs() {
return jvmPauseWarnThresholdMs;
}
public long getJvmPauseSleepTimeMs() {
return jvmPauseSleepTimeMs;
}
public boolean isJvmPauseMonitorToRun() {
return jvmPauseMonitorToRun;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@
import org.apache.zookeeper.server.quorum.QuorumPeerConfig;
import org.apache.zookeeper.server.quorum.QuorumPeerConfig;
import org.apache.zookeeper.server.quorum.ReadOnlyZooKeeperServer;
import org.apache.zookeeper.server.util.JvmPauseMonitor;
import org.apache.zookeeper.txn.CreateSessionTxn;
import org.apache.zookeeper.txn.TxnHeader;
import org.slf4j.Logger;
Expand Down Expand Up @@ -109,6 +110,7 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
private final AtomicLong hzxid = new AtomicLong(0);
public final static Exception ok = new Exception("No prob");
protected RequestProcessor firstProcessor;
protected JvmPauseMonitor jvmPauseMonitor;
protected volatile State state = State.INITIAL;
protected boolean reconfigEnabled;

Expand Down Expand Up @@ -187,6 +189,20 @@ public ZooKeeperServer(FileTxnSnapLog txnLogFactory, int tickTime,
+ " snapdir " + txnLogFactory.getSnapDir());
}

/**
* Adds JvmPauseMonitor and calls
* {@link #ZooKeeperServer(FileTxnSnapLog, int, int, int, ZKDatabase, boolean)}
*
*/
public ZooKeeperServer(JvmPauseMonitor jvmPauseMonitor, FileTxnSnapLog txnLogFactory, int tickTime,
int minSessionTimeout, int maxSessionTimeout, ZKDatabase zkDb, boolean reconfigEnabled) {
this(txnLogFactory, tickTime, minSessionTimeout, maxSessionTimeout, zkDb, reconfigEnabled);
this.jvmPauseMonitor = jvmPauseMonitor;
if(jvmPauseMonitor != null) {
LOG.info("Added JvmPauseMonitor to server");
}
}

/**
* creates a zookeeperserver instance.
* @param txnLogFactory the file transaction snapshot logging class
Expand Down Expand Up @@ -477,10 +493,18 @@ public synchronized void startup() {

registerJMX();

startJvmPauseMonitor();

setState(State.RUNNING);
notifyAll();
}

protected void startJvmPauseMonitor() {
if (this.jvmPauseMonitor != null) {
this.jvmPauseMonitor.serviceStart();
}
}

protected void setupRequestProcessors() {
RequestProcessor finalProcessor = new FinalRequestProcessor(this);
RequestProcessor syncProcessor = new SyncRequestProcessor(this,
Expand Down Expand Up @@ -585,6 +609,9 @@ public synchronized void shutdown(boolean fullyShutDown) {
if (firstProcessor != null) {
firstProcessor.shutdown();
}
if(jvmPauseMonitor != null) {
jvmPauseMonitor.serviceStop();
}

if (zkDb != null) {
if (fullyShutDown) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.apache.zookeeper.server.persistence.FileTxnSnapLog.DatadirException;
import org.apache.zookeeper.server.quorum.QuorumPeerConfig;
import org.apache.zookeeper.server.quorum.QuorumPeerConfig.ConfigException;
import org.apache.zookeeper.server.util.JvmPauseMonitor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -125,8 +126,13 @@ public void runFromConfig(ServerConfig config)
// run() in this thread.
// create a file logger url from the command line args
txnLog = new FileTxnSnapLog(config.dataLogDir, config.dataDir);
final ZooKeeperServer zkServer = new ZooKeeperServer(txnLog,
config.tickTime, config.minSessionTimeout, config.maxSessionTimeout, null, QuorumPeerConfig.isReconfigEnabled());
JvmPauseMonitor jvmPauseMonitor = null;
if(config.jvmPauseMonitorToRun) {
jvmPauseMonitor = new JvmPauseMonitor(config);
}
final ZooKeeperServer zkServer = new ZooKeeperServer(jvmPauseMonitor, txnLog,
config.tickTime, config.minSessionTimeout, config.maxSessionTimeout,
null, QuorumPeerConfig.isReconfigEnabled());
txnLog.setServerStats(zkServer.serverStats());

// Registers shutdown handler which will be used to know the
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@
import org.apache.zookeeper.server.quorum.QuorumPeerConfig.ConfigException;
import org.apache.zookeeper.server.quorum.flexible.QuorumMaj;
import org.apache.zookeeper.server.quorum.flexible.QuorumVerifier;
import org.apache.zookeeper.server.util.JvmPauseMonitor;
import org.apache.zookeeper.server.util.ZxidUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -127,6 +128,7 @@ public class QuorumPeer extends ZooKeeperThread implements QuorumStats.Provider
*/
private ZKDatabase zkDb;

private JvmPauseMonitor jvmPauseMonitor;
public static final class AddressTuple {
public final InetSocketAddress quorumAddr;
public final InetSocketAddress electionAddr;
Expand Down Expand Up @@ -468,6 +470,10 @@ public int getQuorumSize(){
return getVotingView().size();
}

public void setJvmPauseMonitor(JvmPauseMonitor jvmPauseMonitor) {
this.jvmPauseMonitor = jvmPauseMonitor;
}

/**
* QuorumVerifier implementation; default (majority).
*/
Expand Down Expand Up @@ -914,6 +920,7 @@ public synchronized void start() {
System.out.println(e);
}
startLeaderElection();
startJvmPauseMonitor();
super.start();
}

Expand Down Expand Up @@ -992,6 +999,12 @@ synchronized public void startLeaderElection() {
this.electionAlg = createElectionAlgorithm(electionType);
}

private void startJvmPauseMonitor() {
if (this.jvmPauseMonitor != null) {
this.jvmPauseMonitor.serviceStart();
}
}

/**
* Count the number of nodes in the map that could be followers.
* @param peers
Expand Down Expand Up @@ -1351,6 +1364,9 @@ public void shutdown() {
if(udpSocket != null) {
udpSocket.close();
}
if(jvmPauseMonitor != null) {
jvmPauseMonitor.serviceStop();
}

try {
adminServer.shutdown();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.zookeeper.common.ClientX509Util;
import org.apache.zookeeper.common.StringUtils;
import org.apache.zookeeper.server.util.JvmPauseMonitor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;
Expand Down Expand Up @@ -114,6 +115,23 @@ public class QuorumPeerConfig {
*/
private final int MIN_SNAP_RETAIN_COUNT = 3;

/**
* JVM Pause Monitor feature switch
*/
protected boolean jvmPauseMonitorToRun = false;
/**
* JVM Pause Monitor warn threshold in ms
*/
protected long jvmPauseWarnThresholdMs = JvmPauseMonitor.WARN_THRESHOLD_DEFAULT;
/**
* JVM Pause Monitor info threshold in ms
*/
protected long jvmPauseInfoThresholdMs = JvmPauseMonitor.INFO_THRESHOLD_DEFAULT;
/**
* JVM Pause Monitor sleep time in ms
*/
protected long jvmPauseSleepTimeMs = JvmPauseMonitor.SLEEP_TIME_MS_DEFAULT;

@SuppressWarnings("serial")
public static class ConfigException extends Exception {
public ConfigException(String msg) {
Expand Down Expand Up @@ -328,6 +346,14 @@ public void parseProperties(Properties zkProp)
quorumServicePrincipal = value;
} else if (key.equals("quorum.cnxn.threads.size")) {
quorumCnxnThreadsSize = Integer.parseInt(value);
} else if (key.equals(JvmPauseMonitor.INFO_THRESHOLD_KEY)) {
jvmPauseInfoThresholdMs = Long.parseLong(value);
} else if (key.equals(JvmPauseMonitor.WARN_THRESHOLD_KEY)) {
jvmPauseWarnThresholdMs = Long.parseLong(value);
} else if (key.equals(JvmPauseMonitor.SLEEP_TIME_MS_KEY)) {
jvmPauseSleepTimeMs = Long.parseLong(value);
} else if (key.equals(JvmPauseMonitor.JVM_PAUSE_MONITOR_FEATURE_SWITCH_KEY)) {
jvmPauseMonitorToRun = Boolean.parseBoolean(value);
} else {
System.setProperty("zookeeper." + key, value);
}
Expand Down Expand Up @@ -412,7 +438,7 @@ public void parseProperties(Properties zkProp)
if (minSessionTimeout > maxSessionTimeout) {
throw new IllegalArgumentException(
"minSessionTimeout must not be larger than maxSessionTimeout");
}
}

// backward compatibility - dynamic configuration in the same file as
// static configuration params see writeDynamicConfig()
Expand Down Expand Up @@ -658,14 +684,14 @@ public static QuorumVerifier parseDynamicConfig(Properties dynamicConfigProp, in
/*
* If using FLE, then every server requires a separate election
* port.
*/
*/
if (eAlg != 0) {
for (QuorumServer s : qv.getVotingMembers().values()) {
if (s.electionAddr == null)
throw new IllegalArgumentException(
"Missing election port for server: " + s.id);
}
}
}
}
return qv;
}
Expand Down Expand Up @@ -786,6 +812,19 @@ public Map<Long,QuorumServer> getServers() {
return Collections.unmodifiableMap(quorumVerifier.getAllMembers());
}

public long getJvmPauseInfoThresholdMs() {
return jvmPauseInfoThresholdMs;
}
public long getJvmPauseWarnThresholdMs() {
return jvmPauseWarnThresholdMs;
}
public long getJvmPauseSleepTimeMs() {
return jvmPauseSleepTimeMs;
}
public boolean isJvmPauseMonitorToRun() {
return jvmPauseMonitorToRun;
}

public long getServerId() { return serverId; }

public boolean isDistributed() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import javax.security.sasl.SaslException;

import org.apache.yetus.audience.InterfaceAudience;
import org.apache.zookeeper.server.util.JvmPauseMonitor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.zookeeper.jmx.ManagedUtil;
Expand Down Expand Up @@ -204,6 +205,10 @@ public void runFromConfig(QuorumPeerConfig config)
quorumPeer.setQuorumCnxnThreadsSize(config.quorumCnxnThreadsSize);
quorumPeer.initialize();

if(config.jvmPauseMonitorToRun) {
quorumPeer.setJvmPauseMonitor(new JvmPauseMonitor(config));
}

quorumPeer.start();
quorumPeer.join();
} catch (InterruptedException e) {
Expand Down
Loading

0 comments on commit e61f059

Please sign in to comment.