From cb94efe4e0400c94542fcfc6a7f7390d05ba121a Mon Sep 17 00:00:00 2001 From: Mate Szalay-Beko Date: Mon, 23 Mar 2020 16:20:05 +0100 Subject: [PATCH] ZOOKEEPER-3756: Members slow to rejoin quorum using Kubernetes Whenever we close the current master ZooKeeper server, a new leader election is triggered. During the new election, a connection will be established between all the servers, by calling the synchronized 'connectOne' method in QuorumCnxManager. The method will open the socket and send a single small initial message to the other server, usually very quickly. If the destination host is unreachable, it should fail immediately. However, when we use Kubernetes, then the destination host is always reachable as it points to Kubernetes services. If the actual container / pod is not available then the 'socket.connect' method will timeout (by default after 5 sec) instead of failing immediately with NoRouteToHostException. As the 'connectOne' method is synchronized, this timeout will block the creation of other connections, so a single unreachable host can cause timeout in the leader election protocol. One workaround is to decrease the socket connection timeout with the '-Dzookeeper.cnxTimeout' stystem property, but the proper fix would be to make the connection initiation fully asynchronous, as using very low timeout can have its own side effect. Fortunately most of the initial message sending is already made async: the SASL authentication can take more time, so the second (authentication + initial message sending) part of the initiation protocol is already called in a separate thread, when Quorum SASL authentication is enabled. In the following patch I made the whole connection initiation async, by always using the async executor (not only when Quorum SASL is enabled) and also moving the socket.connect call into the async thread. I also created a unit test to verify my fix. I added a static socket factory that can be changed by the tests using a packet private setter method. My test failed (and produced the same error logs as we see in the original Jira ticket) before I applied my changes and a time-outed as no leader election succeeded after 15 seconds. After the changes the test runs very quickly, in 1-2 seconds. Note: due to the multiAddress changes, we will need different PRs to the branch 3.5 and to the 3.6+ branches. I will submit the other PR once this got reviewed. Author: Mate Szalay-Beko Author: Mate Szalay-Beko Reviewers: Enrico Olivelli , Norbert Kalmar Closes #1289 from symat/ZOOKEEPER-3756-master --- .../server/quorum/QuorumCnxManager.java | 170 ++++++++---------- ...CnxManagerSocketConnectionTimeoutTest.java | 112 ++++++++++++ 2 files changed, 188 insertions(+), 94 deletions(-) create mode 100644 zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/QuorumCnxManagerSocketConnectionTimeoutTest.java diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumCnxManager.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumCnxManager.java index 66f6883470a..08384920125 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumCnxManager.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumCnxManager.java @@ -27,7 +27,6 @@ import java.io.IOException; import java.net.InetAddress; import java.net.InetSocketAddress; -import java.net.NoRouteToHostException; import java.net.ServerSocket; import java.net.Socket; import java.net.SocketException; @@ -58,6 +57,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Supplier; import java.util.stream.Collectors; import javax.net.ssl.SSLSocket; import org.apache.zookeeper.common.NetUtils; @@ -186,6 +186,17 @@ public class QuorumCnxManager { */ private final boolean tcpKeepAlive = Boolean.getBoolean("zookeeper.tcpKeepAlive"); + + /* + * Socket factory, allowing the injection of custom socket implementations for testing + */ + static final Supplier DEFAULT_SOCKET_FACTORY = () -> new Socket(); + private static Supplier SOCKET_FACTORY = DEFAULT_SOCKET_FACTORY; + static void setSocketFactory(Supplier factory) { + SOCKET_FACTORY = factory; + } + + public static class Message { Message(ByteBuffer buffer, long sid) { @@ -316,41 +327,30 @@ public QuorumCnxManager(QuorumPeer self, final long mySid, Map(), daemonThFactory); + final ThreadFactory daemonThFactory = runnable -> new Thread(group, runnable, + String.format("QuorumConnectionThread-[myid=%d]-%d", mySid, threadIndex.getAndIncrement())); + + this.connectionExecutor = new ThreadPoolExecutor(3, quorumCnxnThreadsSize, 60, TimeUnit.SECONDS, + new SynchronousQueue<>(), daemonThFactory); this.connectionExecutor.allowCoreThreadTimeOut(true); } @@ -359,20 +359,49 @@ public Thread newThread(Runnable r) { * * @param sid */ - public void testInitiateConnection(long sid) throws Exception { + public void testInitiateConnection(long sid) { LOG.debug("Opening channel to server {}", sid); - Socket sock = new Socket(); - setSockOpts(sock); - InetSocketAddress address = self.getVotingView().get(sid).electionAddr.getReachableOrOne(); - sock.connect(address, cnxTO); - initiateConnection(sock, sid); + initiateConnection(self.getVotingView().get(sid).electionAddr, sid); } /** + * First we create the socket, perform SSL handshake and authentication if needed. + * Then we perform the initiation protocol. * If this server has initiated the connection, then it gives up on the * connection if it loses challenge. Otherwise, it keeps the connection. */ - public void initiateConnection(final Socket sock, final Long sid) { + public void initiateConnection(final MultipleAddresses electionAddr, final Long sid) { + Socket sock = null; + try { + LOG.debug("Opening channel to server {}", sid); + if (self.isSslQuorum()) { + sock = self.getX509Util().createSSLSocket(); + } else { + sock = SOCKET_FACTORY.get(); + } + setSockOpts(sock); + sock.connect(electionAddr.getReachableOrOne(), cnxTO); + if (sock instanceof SSLSocket) { + SSLSocket sslSock = (SSLSocket) sock; + sslSock.startHandshake(); + LOG.info("SSL handshake complete with {} - {} - {}", + sslSock.getRemoteSocketAddress(), + sslSock.getSession().getProtocol(), + sslSock.getSession().getCipherSuite()); + } + + LOG.debug("Connected to server {} using election address: {}:{}", + sid, sock.getInetAddress(), sock.getPort()); + } catch (X509Exception e) { + LOG.warn("Cannot open secure channel to {} at election address {}", sid, electionAddr, e); + closeSocket(sock); + return; + } catch (UnresolvedAddressException | IOException e) { + LOG.warn("Cannot open channel to {} at election address {}", sid, electionAddr, e); + closeSocket(sock); + return; + } + try { startConnection(sock, sid); } catch (IOException e) { @@ -389,16 +418,15 @@ public void initiateConnection(final Socket sock, final Long sid) { * Server will initiate the connection request to its peer server * asynchronously via separate connection thread. */ - public void initiateConnectionAsync(final Socket sock, final Long sid) { + public boolean initiateConnectionAsync(final MultipleAddresses electionAddr, final Long sid) { if (!inprogressConnections.add(sid)) { // simply return as there is a connection request to // server 'sid' already in progress. LOG.debug("Connection request to server id: {} is already in progress, so skipping this request", sid); - closeSocket(sock); - return; + return true; } try { - connectionExecutor.execute(new QuorumConnectionReqThread(sock, sid)); + connectionExecutor.execute(new QuorumConnectionReqThread(electionAddr, sid)); connectionThreadCnt.incrementAndGet(); } catch (Throwable e) { // Imp: Safer side catching all type of exceptions and remove 'sid' @@ -406,27 +434,27 @@ public void initiateConnectionAsync(final Socket sock, final Long sid) { // connection requests from this 'sid' in case of errors. inprogressConnections.remove(sid); LOG.error("Exception while submitting quorum connection request", e); - closeSocket(sock); + return false; } + return true; } /** * Thread to send connection request to peer server. */ private class QuorumConnectionReqThread extends ZooKeeperThread { - - final Socket sock; + final MultipleAddresses electionAddr; final Long sid; - QuorumConnectionReqThread(final Socket sock, final Long sid) { + QuorumConnectionReqThread(final MultipleAddresses electionAddr, final Long sid) { super("QuorumConnectionReqThread-" + sid); - this.sock = sock; + this.electionAddr = electionAddr; this.sid = sid; } @Override public void run() { try { - initiateConnection(sock, sid); + initiateConnection(electionAddr, sid); } finally { inprogressConnections.remove(sid); } @@ -679,6 +707,7 @@ public void toSend(Long sid, ByteBuffer b) { /** * Try to establish a connection to server with id sid using its electionAddr. + * The function will return quickly and the connection will be established asynchronously. * * VisibleForTesting. * @@ -697,62 +726,15 @@ synchronized boolean connectOne(long sid, MultipleAddresses electionAddr) { return true; } - Socket sock = null; - try { - LOG.debug("Opening channel to server {}", sid); - if (self.isSslQuorum()) { - sock = self.getX509Util().createSSLSocket(); - } else { - sock = new Socket(); - } - setSockOpts(sock); - sock.connect(electionAddr.getReachableOrOne(), cnxTO); - if (sock instanceof SSLSocket) { - SSLSocket sslSock = (SSLSocket) sock; - sslSock.startHandshake(); - LOG.info("SSL handshake complete with {} - {} - {}", - sslSock.getRemoteSocketAddress(), - sslSock.getSession().getProtocol(), - sslSock.getSession().getCipherSuite()); - } - - LOG.debug("Connected to server {} using election address: {}:{}", - sid, sock.getInetAddress(), sock.getPort()); - // Sends connection request asynchronously if the quorum - // sasl authentication is enabled. This is required because - // sasl server authentication process may take few seconds to - // finish, this may delay next peer connection requests. - if (quorumSaslAuthEnabled) { - initiateConnectionAsync(sock, sid); - } else { - initiateConnection(sock, sid); - } - return true; - } catch (UnresolvedAddressException e) { - // Sun doesn't include the address that causes this - // exception to be thrown, also UAE cannot be wrapped cleanly - // so we log the exception in order to capture this critical - // detail. - LOG.warn("Cannot open channel to {} at election address {}", sid, electionAddr, e); - closeSocket(sock); - throw e; - } catch (X509Exception e) { - LOG.warn("Cannot open secure channel to {} at election address {}", sid, electionAddr, e); - closeSocket(sock); - return false; - } catch (NoRouteToHostException e) { - LOG.warn("None of the addresses ({}) are reachable for sid {}", electionAddr, sid, e); - closeSocket(sock); - return false; - } catch (IOException e) { - LOG.warn("Cannot open channel to {} at election address {}", sid, electionAddr, e); - closeSocket(sock); - return false; - } + // we are doing connection initiation always asynchronously, since it is possible that + // the socket connection timeouts or the SSL handshake takes too long and don't want + // to keep the rest of the connections to wait + return initiateConnectionAsync(electionAddr, sid); } /** * Try to establish a connection to server with id sid. + * The function will return quickly and the connection will be established asynchronously. * * @param sid server id */ diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/QuorumCnxManagerSocketConnectionTimeoutTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/QuorumCnxManagerSocketConnectionTimeoutTest.java new file mode 100644 index 00000000000..ab1c077d9bd --- /dev/null +++ b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/QuorumCnxManagerSocketConnectionTimeoutTest.java @@ -0,0 +1,112 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zookeeper.server.quorum; + +import static org.junit.Assert.assertTrue; +import java.io.IOException; +import java.net.InetSocketAddress; +import java.net.Socket; +import java.net.SocketAddress; +import java.net.SocketTimeoutException; +import org.apache.zookeeper.ZKTestCase; +import org.apache.zookeeper.test.ClientBase; +import org.apache.zookeeper.test.QuorumUtil; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class QuorumCnxManagerSocketConnectionTimeoutTest extends ZKTestCase { + + private static final Logger LOG = LoggerFactory.getLogger(QuorumCnxManagerSocketConnectionTimeoutTest.class); + private QuorumUtil qu; + + @Before + public void setUp() throws Exception { + // starting a 3 node ensemble without observers + qu = new QuorumUtil(1, 2); + qu.startAll(); + } + + /** + * Testing an error case reported in ZOOKEEPER-3756: + * + * When a new leader election happens after a ZooKeeper server restarted, in Kubernetes + * the rest of the servers can not initiate connection to the restarted one. But they + * get SocketTimeoutException instead of immediate IOException. The Leader Election was + * time-outing quicker than the socket.connect call, so we ended up with cycles of broken + * leader elections. + * + * The fix was to make the connection initiation asynchronous, so one 'broken' connection + * doesn't make the whole leader election to be blocked, even in case of SocketTimeoutException. + * + * @throws Exception + */ + @Test + public void testSocketConnectionTimeoutDuringConnectingToElectionAddress() throws Exception { + + int leaderId = qu.getLeaderServer(); + + // use a custom socket factory that will cause timeout instead of connecting to the + // leader election port of the current leader + final InetSocketAddress leaderElectionAddress = + qu.getLeaderQuorumPeer().getElectionAddress().getOne(); + QuorumCnxManager.setSocketFactory(() -> new SocketStub(leaderElectionAddress)); + + qu.shutdown(leaderId); + + assertTrue("Timeout during waiting for current leader to go down", + ClientBase.waitForServerDown("127.0.0.1:" + qu.getPeer(leaderId).clientPort, + ClientBase.CONNECTION_TIMEOUT)); + + String errorMessage = "No new leader was elected"; + waitFor(errorMessage, () -> qu.leaderExists() && qu.getLeaderServer() != leaderId, 15); + } + + final class SocketStub extends Socket { + + private final InetSocketAddress addressToTimeout; + + SocketStub(InetSocketAddress addressToTimeout) { + this.addressToTimeout = addressToTimeout; + } + + @Override + public void connect(SocketAddress endpoint, int timeout) throws IOException { + if (addressToTimeout.equals(endpoint)) { + try { + Thread.sleep(timeout); + } catch (InterruptedException e) { + LOG.warn("interrupted SocketStub.connect", e); + } + throw new SocketTimeoutException("timeout reached in SocketStub.connect()"); + } + + super.connect(endpoint, timeout); + } + } + + @After + public void tearDown() throws Exception { + qu.shutdownAll(); + QuorumCnxManager.setSocketFactory(QuorumCnxManager.DEFAULT_SOCKET_FACTORY); + } + +} \ No newline at end of file