diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/ClientCnxn.java b/zookeeper-server/src/main/java/org/apache/zookeeper/ClientCnxn.java index d3d8e853a52..d43fb4ad93a 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/ClientCnxn.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/ClientCnxn.java @@ -782,6 +782,11 @@ void queueCallback(AsyncCallback cb, int rc, String path, Object ctx) { eventThread.queueCallback(cb, rc, path, ctx); } + // for test only + protected void onConnecting(InetSocketAddress addr) { + + } + private void conLossPacket(Packet p) { if (p.replyHeader == null) { return; @@ -876,7 +881,7 @@ void readResponse(ByteBuffer incomingBuffer) throws IOException { case AUTHPACKET_XID: LOG.debug("Got auth session id: 0x{}", Long.toHexString(sessionId)); if (replyHdr.getErr() == KeeperException.Code.AUTHFAILED.intValue()) { - state = States.AUTH_FAILED; + setZkState(States.AUTH_FAILED); eventThread.queueEvent(new WatchedEvent(Watcher.Event.EventType.None, Watcher.Event.KeeperState.AuthFailed, null)); eventThread.queueEventOfDeath(); @@ -957,7 +962,11 @@ void readResponse(ByteBuffer incomingBuffer) throws IOException { SendThread(ClientCnxnSocket clientCnxnSocket) { super(makeThreadName("-SendThread()")); - state = States.CONNECTING; + try { + setZkState(States.CONNECTING); + } catch (IOException e) { + throw new RuntimeException("Connection setup failed when migrate state to CONNECTING"); + } this.clientCnxnSocket = clientCnxnSocket; setDaemon(true); } @@ -971,10 +980,20 @@ void readResponse(ByteBuffer incomingBuffer) throws IOException { * * @return */ - ZooKeeper.States getZkState() { + synchronized ZooKeeper.States getZkState() { return state; } + synchronized void setZkState(ZooKeeper.States newState) throws IOException { + if (!state.isAlive()) { + if (newState == States.CONNECTING) { + throw new IOException( + "Connection has already been closed and reconnection is not allowed"); + } + } + state = newState; + } + ClientCnxnSocket getClientCnxnSocket() { return clientCnxnSocket; } @@ -1121,7 +1140,7 @@ private void startConnect(InetSocketAddress addr) throws IOException { LOG.warn("Unexpected exception", e); } } - state = States.CONNECTING; + setZkState(States.CONNECTING); String hostPort = addr.getHostString() + ":" + addr.getPort(); MDC.put("myid", hostPort); @@ -1179,6 +1198,7 @@ public void run() { } else { serverAddress = hostProvider.next(1000); } + ClientCnxn.this.onConnecting(serverAddress); startConnect(serverAddress); clientCnxnSocket.updateLastSendAndHeard(); } @@ -1192,7 +1212,7 @@ public void run() { zooKeeperSaslClient.initialize(ClientCnxn.this); } catch (SaslException e) { LOG.error("SASL authentication with Zookeeper Quorum member failed.", e); - state = States.AUTH_FAILED; + setZkState(States.AUTH_FAILED); sendAuthEvent = true; } } @@ -1200,7 +1220,7 @@ public void run() { if (authState != null) { if (authState == KeeperState.AuthFailed) { // An authentication error occurred during authentication with the Zookeeper Server. - state = States.AUTH_FAILED; + setZkState(States.AUTH_FAILED); sendAuthEvent = true; } else { if (authState == KeeperState.SaslAuthenticated) { @@ -1394,7 +1414,7 @@ void onConnected( boolean isRO) throws IOException { negotiatedSessionTimeout = _negotiatedSessionTimeout; if (negotiatedSessionTimeout <= 0) { - state = States.CLOSED; + setZkState(States.CLOSED); eventThread.queueEvent(new WatchedEvent(Watcher.Event.EventType.None, Watcher.Event.KeeperState.Expired, null)); eventThread.queueEventOfDeath(); @@ -1415,7 +1435,7 @@ void onConnected( hostProvider.onConnected(); sessionId = _sessionId; sessionPasswd = _sessionPasswd; - state = (isRO) ? States.CONNECTEDREADONLY : States.CONNECTED; + setZkState((isRO) ? States.CONNECTEDREADONLY : States.CONNECTED); seenRwServerBefore |= !isRO; LOG.info( "Session establishment complete on server {}, session id = 0x{}, negotiated timeout = {}{}", @@ -1428,7 +1448,10 @@ void onConnected( } void close() { - state = States.CLOSED; + try { + setZkState(States.CLOSED); + } catch (IOException e) { + } clientCnxnSocket.onClosing(); } diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/ClientCnxnSocketFragilityTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/ClientCnxnSocketFragilityTest.java new file mode 100644 index 00000000000..63788fe6c92 --- /dev/null +++ b/zookeeper-server/src/test/java/org/apache/zookeeper/ClientCnxnSocketFragilityTest.java @@ -0,0 +1,321 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zookeeper; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.util.Queue; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import org.apache.zookeeper.ClientCnxn.Packet; +import org.apache.zookeeper.Watcher.Event.KeeperState; +import org.apache.zookeeper.ZooDefs.Ids; +import org.apache.zookeeper.client.HostProvider; +import org.apache.zookeeper.client.ZKClientConfig; +import org.apache.zookeeper.data.Stat; +import org.apache.zookeeper.server.quorum.QuorumPeerTestBase; +import org.apache.zookeeper.test.ClientBase; +import org.junit.Assert; +import org.junit.Test; + +public class ClientCnxnSocketFragilityTest extends QuorumPeerTestBase { + + private static final int SERVER_COUNT = 3; + + private static final int SESSION_TIMEOUT = 40000; + + public static final int CONNECTION_TIMEOUT = 30000; + + private final UnsafeCoordinator unsafeCoordinator = new UnsafeCoordinator(); + + private volatile CustomZooKeeper zk = null; + + private volatile FragileClientCnxnSocketNIO socket = null; + + private volatile CustomClientCnxn cnxn = null; + + private String getCxnString(int[] clientPorts) { + StringBuffer hostPortBuffer = new StringBuffer(); + for (int i = 0; i < clientPorts.length; i++) { + hostPortBuffer.append("127.0.0.1:"); + hostPortBuffer.append(clientPorts[i]); + if (i != (clientPorts.length - 1)) { + hostPortBuffer.append(','); + } + } + return hostPortBuffer.toString(); + } + + private void closeZookeeper(ZooKeeper zk) { + Executors.newSingleThreadExecutor().submit(() -> { + try { + LOG.info("closeZookeeper is fired"); + zk.close(); + } catch (InterruptedException e) { + } + }); + } + + private void getDataBackgroundRetryForever(CustomZooKeeper zk, String path) { + new Thread(() -> { + for (;;) { + try { + zk.getData(path, false, new Stat()); + } catch (Exception e) { + } + } + }).start(); + } + + @Test + public void testClientCnxnSocketFragility() throws Exception { + System.setProperty(ZKClientConfig.ZOOKEEPER_CLIENT_CNXN_SOCKET, FragileClientCnxnSocketNIO.class.getName()); + System.setProperty(ZKClientConfig.ZOOKEEPER_REQUEST_TIMEOUT, "1000"); + final int[] clientPorts = new int[SERVER_COUNT]; + StringBuilder sb = new StringBuilder(); + String server; + + for (int i = 0; i < SERVER_COUNT; i++) { + clientPorts[i] = PortAssignment.unique(); + server = "server." + i + "=127.0.0.1:" + PortAssignment.unique() + ":" + PortAssignment.unique() + + ":participant;127.0.0.1:" + clientPorts[i]; + sb.append(server + "\n"); + } + String currentQuorumCfgSection = sb.toString(); + MainThread[] mt = new MainThread[SERVER_COUNT]; + + for (int i = 0; i < SERVER_COUNT; i++) { + mt[i] = new MainThread(i, clientPorts[i], currentQuorumCfgSection, false); + mt[i].start(); + } + + // Ensure server started + for (int i = 0; i < SERVER_COUNT; i++) { + Assert.assertTrue( + "waiting for server " + i + " being up", + ClientBase.waitForServerUp("127.0.0.1:" + clientPorts[i], CONNECTION_TIMEOUT)); + } + String path = "/testClientCnxnSocketFragility"; + String data = "balabala"; + ClientWatcher watcher = new ClientWatcher(); + zk = new CustomZooKeeper(getCxnString(clientPorts), SESSION_TIMEOUT, watcher); + watcher.watchFor(zk); + + // Let's see some successful operations + zk.create(path, data.getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); + Assert.assertEquals(new String(zk.getData(path, false, new Stat())), data); + Assert.assertTrue(!watcher.isSessionExpired()); + + // Let's make a broken operation + socket.mute(); + boolean catchKeeperException = false; + try { + zk.getData(path, false, new Stat()); + } catch (KeeperException e) { + catchKeeperException = true; + Assert.assertFalse(e instanceof KeeperException.SessionExpiredException); + } + socket.unmute(); + Assert.assertTrue(catchKeeperException); + Assert.assertTrue(!watcher.isSessionExpired()); + + getDataBackgroundRetryForever(zk, path); + // Let's make a broken network + socket.mute(); + + // Let's attempt to close ZooKeeper + cnxn.attemptClose(); + + // Wait some time to expect continuous reconnecting. + // We try to make reconnecting hit the unsafe point. + TimeUnit.MILLISECONDS.sleep(3000); + + // close zk with timeout 1000 milli seconds + closeZookeeper(zk); + TimeUnit.MILLISECONDS.sleep(3000); + + // Since we already close zookeeper, we expect that the zk should not be alive. + Assert.assertTrue(!zk.isAlive()); + Assert.assertTrue(!watcher.isSessionExpired()); + + for (int i = 0; i < SERVER_COUNT; i++) { + mt[i].shutdown(); + } + } + + public static class FragileClientCnxnSocketNIO extends ClientCnxnSocketNIO { + + private volatile boolean mute; + + public FragileClientCnxnSocketNIO(ZKClientConfig clientConfig) throws IOException { + super(clientConfig); + mute = false; + } + + synchronized void mute() { + if (!mute) { + LOG.info("Fire socket mute"); + mute = true; + } + } + + synchronized void unmute() { + if (mute) { + LOG.info("Fire socket unmute"); + mute = false; + } + } + + @Override + void doTransport(int waitTimeOut, Queue pendingQueue, ClientCnxn cnxn) + throws IOException, InterruptedException { + if (mute) { + throw new IOException("Socket is mute"); + } + super.doTransport(waitTimeOut, pendingQueue, cnxn); + } + + @Override + void connect(InetSocketAddress addr) throws IOException { + if (mute) { + throw new IOException("Socket is mute"); + } + super.connect(addr); + } + } + + class ClientWatcher implements Watcher { + + private ZooKeeper zk; + + private boolean sessionExpired = false; + + void watchFor(ZooKeeper zk) { + this.zk = zk; + } + + @Override + public void process(WatchedEvent event) { + LOG.info("Watcher got {}", event); + if (event.getState() == KeeperState.Expired) { + sessionExpired = true; + } + } + + boolean isSessionExpired() { + return sessionExpired; + } + } + + // Coordinate to construct the risky scenario. + class UnsafeCoordinator { + + private CountDownLatch syncLatch = new CountDownLatch(2); + + void sync(boolean closing) { + LOG.info("Attempt to sync with {}", closing); + if (closing) { + syncLatch.countDown(); + try { + syncLatch.await(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + } + } + + class CustomClientCnxn extends ClientCnxn { + + private volatile boolean closing = false; + + public CustomClientCnxn( + String chrootPath, + HostProvider hostProvider, + int sessionTimeout, + ZooKeeper zooKeeper, + ClientWatchManager watcher, + ClientCnxnSocket clientCnxnSocket, + boolean canBeReadOnly) throws IOException { + super(chrootPath, hostProvider, sessionTimeout, zooKeeper, watcher, clientCnxnSocket, canBeReadOnly); + } + + void attemptClose() { + closing = true; + } + + @Override + protected void onConnecting(InetSocketAddress addr) { + if (closing) { + LOG.info("Attempt to connnecting {} {} {}", addr, closing, state); + ///////// Unsafe Region //////// + // Slow down and zoom out the unsafe point to make risk + // The unsafe point is that startConnect happens after sendThread.close + unsafeCoordinator.sync(closing); + //////////////////////////////// + } + } + + @Override + public void disconnect() { + Assert.assertTrue(closing); + LOG.info("Attempt to disconnecting client for session: 0x{} {} {}", Long.toHexString(getSessionId()), closing, state); + sendThread.close(); + ///////// Unsafe Region //////// + unsafeCoordinator.sync(closing); + //////////////////////////////// + try { + sendThread.join(); + } catch (InterruptedException ex) { + LOG.warn("Got interrupted while waiting for the sender thread to close", ex); + } + eventThread.queueEventOfDeath(); + if (zooKeeperSaslClient != null) { + zooKeeperSaslClient.shutdown(); + } + } + } + + class CustomZooKeeper extends ZooKeeper { + + public CustomZooKeeper(String connectString, int sessionTimeout, Watcher watcher) throws IOException { + super(connectString, sessionTimeout, watcher); + } + + public boolean isAlive() { + return cnxn.getState().isAlive(); + } + + @Override + protected ClientCnxn createConnection( + String chrootPath, + HostProvider hostProvider, + int sessionTimeout, + ZooKeeper zooKeeper, + ClientWatchManager watcher, + ClientCnxnSocket clientCnxnSocket, + boolean canBeReadOnly) throws IOException { + Assert.assertTrue(clientCnxnSocket instanceof FragileClientCnxnSocketNIO); + socket = (FragileClientCnxnSocketNIO) clientCnxnSocket; + ClientCnxnSocketFragilityTest.this.cnxn = new CustomClientCnxn(chrootPath, hostProvider, sessionTimeout, zooKeeper, watcher, clientCnxnSocket, canBeReadOnly); + return ClientCnxnSocketFragilityTest.this.cnxn; + } + } +} \ No newline at end of file