diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumCnxManager.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumCnxManager.java index 66f6883470a..08384920125 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumCnxManager.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumCnxManager.java @@ -27,7 +27,6 @@ import java.io.IOException; import java.net.InetAddress; import java.net.InetSocketAddress; -import java.net.NoRouteToHostException; import java.net.ServerSocket; import java.net.Socket; import java.net.SocketException; @@ -58,6 +57,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Supplier; import java.util.stream.Collectors; import javax.net.ssl.SSLSocket; import org.apache.zookeeper.common.NetUtils; @@ -186,6 +186,17 @@ public class QuorumCnxManager { */ private final boolean tcpKeepAlive = Boolean.getBoolean("zookeeper.tcpKeepAlive"); + + /* + * Socket factory, allowing the injection of custom socket implementations for testing + */ + static final Supplier DEFAULT_SOCKET_FACTORY = () -> new Socket(); + private static Supplier SOCKET_FACTORY = DEFAULT_SOCKET_FACTORY; + static void setSocketFactory(Supplier factory) { + SOCKET_FACTORY = factory; + } + + public static class Message { Message(ByteBuffer buffer, long sid) { @@ -316,41 +327,30 @@ public QuorumCnxManager(QuorumPeer self, final long mySid, Map(), daemonThFactory); + final ThreadFactory daemonThFactory = runnable -> new Thread(group, runnable, + String.format("QuorumConnectionThread-[myid=%d]-%d", mySid, threadIndex.getAndIncrement())); + + this.connectionExecutor = new ThreadPoolExecutor(3, quorumCnxnThreadsSize, 60, TimeUnit.SECONDS, + new SynchronousQueue<>(), daemonThFactory); this.connectionExecutor.allowCoreThreadTimeOut(true); } @@ -359,20 +359,49 @@ public Thread newThread(Runnable r) { * * @param sid */ - public void testInitiateConnection(long sid) throws Exception { + public void testInitiateConnection(long sid) { LOG.debug("Opening channel to server {}", sid); - Socket sock = new Socket(); - setSockOpts(sock); - InetSocketAddress address = self.getVotingView().get(sid).electionAddr.getReachableOrOne(); - sock.connect(address, cnxTO); - initiateConnection(sock, sid); + initiateConnection(self.getVotingView().get(sid).electionAddr, sid); } /** + * First we create the socket, perform SSL handshake and authentication if needed. + * Then we perform the initiation protocol. * If this server has initiated the connection, then it gives up on the * connection if it loses challenge. Otherwise, it keeps the connection. */ - public void initiateConnection(final Socket sock, final Long sid) { + public void initiateConnection(final MultipleAddresses electionAddr, final Long sid) { + Socket sock = null; + try { + LOG.debug("Opening channel to server {}", sid); + if (self.isSslQuorum()) { + sock = self.getX509Util().createSSLSocket(); + } else { + sock = SOCKET_FACTORY.get(); + } + setSockOpts(sock); + sock.connect(electionAddr.getReachableOrOne(), cnxTO); + if (sock instanceof SSLSocket) { + SSLSocket sslSock = (SSLSocket) sock; + sslSock.startHandshake(); + LOG.info("SSL handshake complete with {} - {} - {}", + sslSock.getRemoteSocketAddress(), + sslSock.getSession().getProtocol(), + sslSock.getSession().getCipherSuite()); + } + + LOG.debug("Connected to server {} using election address: {}:{}", + sid, sock.getInetAddress(), sock.getPort()); + } catch (X509Exception e) { + LOG.warn("Cannot open secure channel to {} at election address {}", sid, electionAddr, e); + closeSocket(sock); + return; + } catch (UnresolvedAddressException | IOException e) { + LOG.warn("Cannot open channel to {} at election address {}", sid, electionAddr, e); + closeSocket(sock); + return; + } + try { startConnection(sock, sid); } catch (IOException e) { @@ -389,16 +418,15 @@ public void initiateConnection(final Socket sock, final Long sid) { * Server will initiate the connection request to its peer server * asynchronously via separate connection thread. */ - public void initiateConnectionAsync(final Socket sock, final Long sid) { + public boolean initiateConnectionAsync(final MultipleAddresses electionAddr, final Long sid) { if (!inprogressConnections.add(sid)) { // simply return as there is a connection request to // server 'sid' already in progress. LOG.debug("Connection request to server id: {} is already in progress, so skipping this request", sid); - closeSocket(sock); - return; + return true; } try { - connectionExecutor.execute(new QuorumConnectionReqThread(sock, sid)); + connectionExecutor.execute(new QuorumConnectionReqThread(electionAddr, sid)); connectionThreadCnt.incrementAndGet(); } catch (Throwable e) { // Imp: Safer side catching all type of exceptions and remove 'sid' @@ -406,27 +434,27 @@ public void initiateConnectionAsync(final Socket sock, final Long sid) { // connection requests from this 'sid' in case of errors. inprogressConnections.remove(sid); LOG.error("Exception while submitting quorum connection request", e); - closeSocket(sock); + return false; } + return true; } /** * Thread to send connection request to peer server. */ private class QuorumConnectionReqThread extends ZooKeeperThread { - - final Socket sock; + final MultipleAddresses electionAddr; final Long sid; - QuorumConnectionReqThread(final Socket sock, final Long sid) { + QuorumConnectionReqThread(final MultipleAddresses electionAddr, final Long sid) { super("QuorumConnectionReqThread-" + sid); - this.sock = sock; + this.electionAddr = electionAddr; this.sid = sid; } @Override public void run() { try { - initiateConnection(sock, sid); + initiateConnection(electionAddr, sid); } finally { inprogressConnections.remove(sid); } @@ -679,6 +707,7 @@ public void toSend(Long sid, ByteBuffer b) { /** * Try to establish a connection to server with id sid using its electionAddr. + * The function will return quickly and the connection will be established asynchronously. * * VisibleForTesting. * @@ -697,62 +726,15 @@ synchronized boolean connectOne(long sid, MultipleAddresses electionAddr) { return true; } - Socket sock = null; - try { - LOG.debug("Opening channel to server {}", sid); - if (self.isSslQuorum()) { - sock = self.getX509Util().createSSLSocket(); - } else { - sock = new Socket(); - } - setSockOpts(sock); - sock.connect(electionAddr.getReachableOrOne(), cnxTO); - if (sock instanceof SSLSocket) { - SSLSocket sslSock = (SSLSocket) sock; - sslSock.startHandshake(); - LOG.info("SSL handshake complete with {} - {} - {}", - sslSock.getRemoteSocketAddress(), - sslSock.getSession().getProtocol(), - sslSock.getSession().getCipherSuite()); - } - - LOG.debug("Connected to server {} using election address: {}:{}", - sid, sock.getInetAddress(), sock.getPort()); - // Sends connection request asynchronously if the quorum - // sasl authentication is enabled. This is required because - // sasl server authentication process may take few seconds to - // finish, this may delay next peer connection requests. - if (quorumSaslAuthEnabled) { - initiateConnectionAsync(sock, sid); - } else { - initiateConnection(sock, sid); - } - return true; - } catch (UnresolvedAddressException e) { - // Sun doesn't include the address that causes this - // exception to be thrown, also UAE cannot be wrapped cleanly - // so we log the exception in order to capture this critical - // detail. - LOG.warn("Cannot open channel to {} at election address {}", sid, electionAddr, e); - closeSocket(sock); - throw e; - } catch (X509Exception e) { - LOG.warn("Cannot open secure channel to {} at election address {}", sid, electionAddr, e); - closeSocket(sock); - return false; - } catch (NoRouteToHostException e) { - LOG.warn("None of the addresses ({}) are reachable for sid {}", electionAddr, sid, e); - closeSocket(sock); - return false; - } catch (IOException e) { - LOG.warn("Cannot open channel to {} at election address {}", sid, electionAddr, e); - closeSocket(sock); - return false; - } + // we are doing connection initiation always asynchronously, since it is possible that + // the socket connection timeouts or the SSL handshake takes too long and don't want + // to keep the rest of the connections to wait + return initiateConnectionAsync(electionAddr, sid); } /** * Try to establish a connection to server with id sid. + * The function will return quickly and the connection will be established asynchronously. * * @param sid server id */ diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/QuorumCnxManagerSocketConnectionTimeoutTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/QuorumCnxManagerSocketConnectionTimeoutTest.java new file mode 100644 index 00000000000..ab1c077d9bd --- /dev/null +++ b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/QuorumCnxManagerSocketConnectionTimeoutTest.java @@ -0,0 +1,112 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zookeeper.server.quorum; + +import static org.junit.Assert.assertTrue; +import java.io.IOException; +import java.net.InetSocketAddress; +import java.net.Socket; +import java.net.SocketAddress; +import java.net.SocketTimeoutException; +import org.apache.zookeeper.ZKTestCase; +import org.apache.zookeeper.test.ClientBase; +import org.apache.zookeeper.test.QuorumUtil; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class QuorumCnxManagerSocketConnectionTimeoutTest extends ZKTestCase { + + private static final Logger LOG = LoggerFactory.getLogger(QuorumCnxManagerSocketConnectionTimeoutTest.class); + private QuorumUtil qu; + + @Before + public void setUp() throws Exception { + // starting a 3 node ensemble without observers + qu = new QuorumUtil(1, 2); + qu.startAll(); + } + + /** + * Testing an error case reported in ZOOKEEPER-3756: + * + * When a new leader election happens after a ZooKeeper server restarted, in Kubernetes + * the rest of the servers can not initiate connection to the restarted one. But they + * get SocketTimeoutException instead of immediate IOException. The Leader Election was + * time-outing quicker than the socket.connect call, so we ended up with cycles of broken + * leader elections. + * + * The fix was to make the connection initiation asynchronous, so one 'broken' connection + * doesn't make the whole leader election to be blocked, even in case of SocketTimeoutException. + * + * @throws Exception + */ + @Test + public void testSocketConnectionTimeoutDuringConnectingToElectionAddress() throws Exception { + + int leaderId = qu.getLeaderServer(); + + // use a custom socket factory that will cause timeout instead of connecting to the + // leader election port of the current leader + final InetSocketAddress leaderElectionAddress = + qu.getLeaderQuorumPeer().getElectionAddress().getOne(); + QuorumCnxManager.setSocketFactory(() -> new SocketStub(leaderElectionAddress)); + + qu.shutdown(leaderId); + + assertTrue("Timeout during waiting for current leader to go down", + ClientBase.waitForServerDown("127.0.0.1:" + qu.getPeer(leaderId).clientPort, + ClientBase.CONNECTION_TIMEOUT)); + + String errorMessage = "No new leader was elected"; + waitFor(errorMessage, () -> qu.leaderExists() && qu.getLeaderServer() != leaderId, 15); + } + + final class SocketStub extends Socket { + + private final InetSocketAddress addressToTimeout; + + SocketStub(InetSocketAddress addressToTimeout) { + this.addressToTimeout = addressToTimeout; + } + + @Override + public void connect(SocketAddress endpoint, int timeout) throws IOException { + if (addressToTimeout.equals(endpoint)) { + try { + Thread.sleep(timeout); + } catch (InterruptedException e) { + LOG.warn("interrupted SocketStub.connect", e); + } + throw new SocketTimeoutException("timeout reached in SocketStub.connect()"); + } + + super.connect(endpoint, timeout); + } + } + + @After + public void tearDown() throws Exception { + qu.shutdownAll(); + QuorumCnxManager.setSocketFactory(QuorumCnxManager.DEFAULT_SOCKET_FACTORY); + } + +} \ No newline at end of file