Skip to content

Commit

Permalink
ZOOKEEPER-3172: Quorum TLS - fix port unification to allow rolling up…
Browse files Browse the repository at this point in the history
…grades
  • Loading branch information
ivmaykov committed Oct 26, 2018
1 parent 9db7d4f commit 807954a
Show file tree
Hide file tree
Showing 12 changed files with 1,394 additions and 145 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@
import javax.net.ssl.X509ExtendedTrustManager;
import javax.net.ssl.X509KeyManager;
import javax.net.ssl.X509TrustManager;

import java.io.ByteArrayInputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
Expand Down Expand Up @@ -82,6 +84,8 @@ public abstract class X509Util {
"TLS_ECDHE_RSA_WITH_AES_128_CBC_SHA256"
};

public static final int DEFAULT_HANDSHAKE_DETECTION_TIMEOUT_MILLIS = 5000;

/**
* This enum represents the file type of a KeyStore or TrustStore. Currently, JKS (java keystore) and PEM types
* are supported.
Expand Down Expand Up @@ -135,6 +139,7 @@ public static StoreFileType fromPropertyValue(String prop) {
private String sslHostnameVerificationEnabledProperty = getConfigPrefix() + "hostnameVerification";
private String sslCrlEnabledProperty = getConfigPrefix() + "crl";
private String sslOcspEnabledProperty = getConfigPrefix() + "ocsp";
private String sslHandshakeDetectionTimeoutMillisProperty = getConfigPrefix() + "handshakeDetectionTimeoutMillis";

private String[] cipherSuites;

Expand Down Expand Up @@ -196,6 +201,16 @@ public String getSslOcspEnabledProperty() {
return sslOcspEnabledProperty;
}

/**
* Returns the config property key that controls the amount of time, in milliseconds, that the first
* UnifiedServerSocket read operation will block for when trying to detect the client mode (TLS or PLAINTEXT).
*
* @return the config property key.
*/
public String getSslHandshakeDetectionTimeoutMillisProperty() {
return sslHandshakeDetectionTimeoutMillisProperty;
}

public SSLContext getDefaultSSLContext() throws X509Exception.SSLContextException {
SSLContext result = defaultSSLContext.get();
if (result == null) {
Expand All @@ -218,6 +233,31 @@ private SSLContext createSSLContext() throws SSLContextException {
return createSSLContext(config);
}

/**
* Returns the max amount of time, in milliseconds, that the first UnifiedServerSocket read() operation should
* block for when trying to detect the client mode (TLS or PLAINTEXT).
* Defaults to {@link X509Util#DEFAULT_HANDSHAKE_DETECTION_TIMEOUT_MILLIS}.
*
* @return the handshake detection timeout, in milliseconds.
*/
public int getSslHandshakeTimeoutMillis() {
String propertyString = System.getProperty(getSslHandshakeDetectionTimeoutMillisProperty());
int result;
if (propertyString == null) {
result = DEFAULT_HANDSHAKE_DETECTION_TIMEOUT_MILLIS;
} else {
result = Integer.parseInt(propertyString);
if (result < 1) {
// Timeout of 0 is not allowed, since an infinite timeout can permanently lock up an
// accept() thread.
LOG.warn("Invalid value for " + getSslHandshakeDetectionTimeoutMillisProperty() + ": " + result +
", using the default value of " + DEFAULT_HANDSHAKE_DETECTION_TIMEOUT_MILLIS);
result = DEFAULT_HANDSHAKE_DETECTION_TIMEOUT_MILLIS;
}
}
return result;
}

public SSLContext createSSLContext(ZKConfig config) throws SSLContextException {
KeyManager[] keyManagers = null;
TrustManager[] trustManagers = null;
Expand Down Expand Up @@ -427,14 +467,22 @@ public static X509TrustManager createTrustManager(String trustStoreLocation, Str
public SSLSocket createSSLSocket() throws X509Exception, IOException {
SSLSocket sslSocket = (SSLSocket) getDefaultSSLContext().getSocketFactory().createSocket();
configureSSLSocket(sslSocket);

sslSocket.setUseClientMode(true);
return sslSocket;
}

public SSLSocket createSSLSocket(Socket socket) throws X509Exception, IOException {
SSLSocket sslSocket = (SSLSocket) getDefaultSSLContext().getSocketFactory().createSocket(socket, null, socket.getPort(), true);
public SSLSocket createSSLSocket(Socket socket, byte[] pushbackBytes) throws X509Exception, IOException {
SSLSocket sslSocket;
if (pushbackBytes != null && pushbackBytes.length > 0) {
sslSocket = (SSLSocket) getDefaultSSLContext().getSocketFactory().createSocket(
socket, new ByteArrayInputStream(pushbackBytes), true);
} else {
sslSocket = (SSLSocket) getDefaultSSLContext().getSocketFactory().createSocket(
socket, null, socket.getPort(), true);
}
configureSSLSocket(sslSocket);

sslSocket.setUseClientMode(false);
sslSocket.setNeedClientAuth(true);
return sslSocket;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,8 @@ private void putSSLProperties(X509Util x509Util) {
System.getProperty(x509Util.getSslCrlEnabledProperty()));
properties.put(x509Util.getSslOcspEnabledProperty(),
System.getProperty(x509Util.getSslOcspEnabledProperty()));
properties.put(x509Util.getSslHandshakeDetectionTimeoutMillisProperty(),
System.getProperty(x509Util.getSslHandshakeDetectionTimeoutMillisProperty()));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@
import javax.security.sasl.SaslException;

import org.apache.zookeeper.ZooDefs.OpCode;
import org.apache.zookeeper.common.QuorumX509Util;
import org.apache.zookeeper.common.Time;
import org.apache.zookeeper.common.X509Exception;
import org.apache.zookeeper.server.FinalRequestProcessor;
Expand Down Expand Up @@ -234,21 +233,16 @@ public boolean isQuorumSynced(QuorumVerifier qv) {

private final ServerSocket ss;

Leader(QuorumPeer self,LeaderZooKeeperServer zk) throws IOException, X509Exception {
Leader(QuorumPeer self,LeaderZooKeeperServer zk) throws IOException {
this.self = self;
this.proposalStats = new BufferStats();
try {
if (self.shouldUsePortUnification()) {
if (self.shouldUsePortUnification() || self.isSslQuorum()) {
boolean allowInsecureConnection = self.shouldUsePortUnification();
if (self.getQuorumListenOnAllIPs()) {
ss = new UnifiedServerSocket(new QuorumX509Util(), self.getQuorumAddress().getPort());
ss = new UnifiedServerSocket(self.getX509Util(), allowInsecureConnection, self.getQuorumAddress().getPort());
} else {
ss = new UnifiedServerSocket(new QuorumX509Util());
}
} else if (self.isSslQuorum()) {
if (self.getQuorumListenOnAllIPs()) {
ss = new QuorumX509Util().createSSLServerSocket(self.getQuorumAddress().getPort());
} else {
ss = new QuorumX509Util().createSSLServerSocket();
ss = new UnifiedServerSocket(self.getX509Util(), allowInsecureConnection);
}
} else {
if (self.getQuorumListenOnAllIPs()) {
Expand All @@ -261,9 +255,6 @@ public boolean isQuorumSynced(QuorumVerifier qv) {
if (!self.getQuorumListenOnAllIPs()) {
ss.bind(self.getQuorumAddress());
}
} catch (X509Exception e) {
LOG.error("Failed to setup ssl server socket", e);
throw e;
} catch (BindException e) {
if (self.getQuorumListenOnAllIPs()) {
LOG.error("Couldn't bind to port " + self.getQuorumAddress().getPort(), e);
Expand Down Expand Up @@ -399,8 +390,10 @@ public LearnerCnxAcceptor() {
public void run() {
try {
while (!stop) {
try{
Socket s = ss.accept();
Socket s = null;
boolean error = false;
try {
s = ss.accept();

// start with the initLimit, once the ack is processed
// in LearnerHandler switch to the syncLimit
Expand All @@ -412,6 +405,7 @@ public void run() {
LearnerHandler fh = new LearnerHandler(s, is, Leader.this);
fh.start();
} catch (SocketException e) {
error = true;
if (stop) {
LOG.info("exception while shutting down acceptor: "
+ e);
Expand All @@ -425,6 +419,19 @@ public void run() {
}
} catch (SaslException e){
LOG.error("Exception while connecting to quorum learner", e);
error = true;
} catch (Exception e) {
error = true;
throw e;
} finally {
// Don't leak sockets on errors
if (error && s != null && !s.isClosed()) {
try {
s.close();
} catch (IOException e) {
LOG.warn("Error closing socket", e);
}
}
}
}
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,7 @@
import org.apache.jute.InputArchive;
import org.apache.jute.OutputArchive;
import org.apache.jute.Record;
import org.apache.zookeeper.common.QuorumX509Util;
import org.apache.zookeeper.common.X509Exception;
import org.apache.zookeeper.common.X509Util;
import org.apache.zookeeper.server.ExitCode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -74,8 +72,6 @@ static class PacketInFlight {

protected Socket sock;

protected X509Util x509Util;

/**
* Socket getter
* @return
Expand Down Expand Up @@ -304,10 +300,7 @@ protected void connectToLeader(InetSocketAddress addr, String hostname)
private Socket createSocket() throws X509Exception, IOException {
Socket sock;
if (self.isSslQuorum()) {
if (x509Util == null) {
x509Util = new QuorumX509Util();
}
sock = x509Util.createSSLSocket();
sock = self.getX509Util().createSSLSocket();
} else {
sock = new Socket();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,32 +18,47 @@

package org.apache.zookeeper.server.quorum;

import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.SequenceInputStream;
import java.io.PushbackInputStream;
import java.net.Socket;
import java.net.SocketImpl;

public class PrependableSocket extends Socket {

private SequenceInputStream sequenceInputStream;
private PushbackInputStream pushbackInputStream;

public PrependableSocket(SocketImpl base) throws IOException {
super(base);
}

@Override
public InputStream getInputStream() throws IOException {
if (sequenceInputStream == null) {
if (pushbackInputStream == null) {
return super.getInputStream();
}

return sequenceInputStream;
return pushbackInputStream;
}

public void prependToInputStream(byte[] bytes) throws IOException {
sequenceInputStream = new SequenceInputStream(new ByteArrayInputStream(bytes), getInputStream());
/**
* Prepend some bytes that have already been read back to the socket's input stream. Note that this method can be
* called at most once with a non-0 length per socket instance.
* @param bytes the bytes to prepend.
* @param offset offset in the byte array to start at.
* @param length number of bytes to prepend.
* @throws IOException if this method was already called on the socket instance, or if super.getInputStream() throws.
*/
public void prependToInputStream(byte[] bytes, int offset, int length) throws IOException {
if (length == 0) {
return; // nothing to prepend
}
if (pushbackInputStream != null) {
throw new IOException("prependToInputStream() called more than once");
}
PushbackInputStream pushbackInputStream = new PushbackInputStream(getInputStream(), length);
pushbackInputStream.unread(bytes, offset, length);
this.pushbackInputStream = pushbackInputStream;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,7 @@
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;

import org.apache.zookeeper.common.QuorumX509Util;
import org.apache.zookeeper.common.X509Exception;
import org.apache.zookeeper.common.X509Util;
import org.apache.zookeeper.server.ExitCode;
import org.apache.zookeeper.server.quorum.QuorumPeerConfig.ConfigException;
import org.apache.zookeeper.server.util.ConfigUtils;
Expand Down Expand Up @@ -175,9 +173,6 @@ public class QuorumCnxManager {
*/
private final boolean tcpKeepAlive = Boolean.getBoolean("zookeeper.tcpKeepAlive");


private X509Util x509Util;

static public class Message {
Message(ByteBuffer buffer, long sid) {
this.buffer = buffer;
Expand Down Expand Up @@ -291,8 +286,6 @@ public QuorumCnxManager(QuorumPeer self,
// Starts listener thread that waits for connection requests
listener = new Listener();
listener.setName("QuorumPeerListener");

x509Util = new QuorumX509Util();
}

private void initializeAuth(final long mySid,
Expand Down Expand Up @@ -655,17 +648,18 @@ synchronized private boolean connectOne(long sid, InetSocketAddress electionAddr
try {
LOG.debug("Opening channel to server " + sid);
if (self.isSslQuorum()) {
SSLSocket sslSock = x509Util.createSSLSocket();
setSockOpts(sslSock);
sslSock.connect(electionAddr, cnxTO);
sslSock.startHandshake();
sock = sslSock;
} else {
sock = new Socket();
setSockOpts(sock);
sock.connect(electionAddr, cnxTO);
}
LOG.debug("Connected to server " + sid);
SSLSocket sslSock = self.getX509Util().createSSLSocket();
setSockOpts(sslSock);
sslSock.connect(electionAddr, cnxTO);
sslSock.startHandshake();
sock = sslSock;
} else {
sock = new Socket();
setSockOpts(sock);
sock.connect(electionAddr, cnxTO);

}
LOG.debug("Connected to server " + sid);
// Sends connection request asynchronously if the quorum
// sasl authentication is enabled. This is required because
// sasl server authentication process may take few seconds to
Expand Down Expand Up @@ -876,9 +870,9 @@ public void run() {
while((!shutdown) && (numRetries < 3)){
try {
if (self.shouldUsePortUnification()) {
ss = new UnifiedServerSocket(x509Util);
ss = new UnifiedServerSocket(self.getX509Util(), true);
} else if (self.isSslQuorum()) {
ss = x509Util.createSSLServerSocket();
ss = new UnifiedServerSocket(self.getX509Util(), false);
} else {
ss = new ServerSocket();
}
Expand Down Expand Up @@ -920,7 +914,7 @@ public void run() {
+ "see ZOOKEEPER-2836");
}
}
} catch (IOException|X509Exception e) {
} catch (IOException e) {
if (shutdown) {
break;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import org.apache.zookeeper.KeeperException.BadArgumentsException;
import org.apache.zookeeper.common.AtomicFileWritingIdiom;
import org.apache.zookeeper.common.AtomicFileWritingIdiom.WriterStatement;
import org.apache.zookeeper.common.QuorumX509Util;
import org.apache.zookeeper.common.Time;
import org.apache.zookeeper.common.X509Exception;
import org.apache.zookeeper.jmx.MBeanRegistry;
Expand Down Expand Up @@ -479,6 +480,12 @@ public boolean shouldUsePortUnification() {
return shouldUsePortUnification;
}

private final QuorumX509Util x509Util;

QuorumX509Util getX509Util() {
return x509Util;
}

/**
* This is who I think the leader currently is.
*/
Expand Down Expand Up @@ -801,6 +808,7 @@ public QuorumPeer() throws SaslException {
quorumStats = new QuorumStats(this);
jmxRemotePeerBean = new HashMap<Long, RemotePeerBean>();
adminServer = AdminServerFactory.createAdminServer();
x509Util = new QuorumX509Util();
initialize();
}

Expand Down
Loading

0 comments on commit 807954a

Please sign in to comment.