diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/ClientCnxn.java b/zookeeper-server/src/main/java/org/apache/zookeeper/ClientCnxn.java index b2c5a57a852..a727082d75a 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/ClientCnxn.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/ClientCnxn.java @@ -418,6 +418,7 @@ public ClientCnxn( * @param canBeReadOnly * whether the connection is allowed to go to read-only * mode in case of partitioning + * @throws IOException in cases of broken network */ public ClientCnxn( String chrootPath, @@ -428,7 +429,7 @@ public ClientCnxn( ClientCnxnSocket clientCnxnSocket, long sessionId, byte[] sessionPasswd, - boolean canBeReadOnly) { + boolean canBeReadOnly) throws IOException { this.zooKeeper = zooKeeper; this.watcher = watcher; this.sessionId = sessionId; @@ -795,6 +796,11 @@ void queueCallback(AsyncCallback cb, int rc, String path, Object ctx) { eventThread.queueCallback(cb, rc, path, ctx); } + // for test only + protected void onConnecting(InetSocketAddress addr) { + + } + private void conLossPacket(Packet p) { if (p.replyHeader == null) { return; @@ -888,7 +894,7 @@ void readResponse(ByteBuffer incomingBuffer) throws IOException { case AUTHPACKET_XID: LOG.debug("Got auth session id: 0x{}", Long.toHexString(sessionId)); if (replyHdr.getErr() == KeeperException.Code.AUTHFAILED.intValue()) { - state = States.AUTH_FAILED; + changeZkState(States.AUTH_FAILED); eventThread.queueEvent(new WatchedEvent(Watcher.Event.EventType.None, Watcher.Event.KeeperState.AuthFailed, null)); eventThread.queueEventOfDeath(); @@ -967,9 +973,9 @@ void readResponse(ByteBuffer incomingBuffer) throws IOException { } } - SendThread(ClientCnxnSocket clientCnxnSocket) { + SendThread(ClientCnxnSocket clientCnxnSocket) throws IOException { super(makeThreadName("-SendThread()")); - state = States.CONNECTING; + changeZkState(States.CONNECTING); this.clientCnxnSocket = clientCnxnSocket; setDaemon(true); } @@ -983,10 +989,19 @@ void readResponse(ByteBuffer incomingBuffer) throws IOException { * * @return */ - ZooKeeper.States getZkState() { + synchronized ZooKeeper.States getZkState() { return state; } + synchronized void changeZkState(ZooKeeper.States newState) throws IOException { + if (!state.isAlive() && newState == States.CONNECTING) { + throw new IOException( + "Connection has already been closed and reconnection is not allowed"); + } + // It's safer to place state modification at the end. + state = newState; + } + ClientCnxnSocket getClientCnxnSocket() { return clientCnxnSocket; } @@ -1133,7 +1148,7 @@ private void startConnect(InetSocketAddress addr) throws IOException { LOG.warn("Unexpected exception", e); } } - state = States.CONNECTING; + changeZkState(States.CONNECTING); String hostPort = addr.getHostString() + ":" + addr.getPort(); MDC.put("myid", hostPort); @@ -1191,6 +1206,7 @@ public void run() { } else { serverAddress = hostProvider.next(1000); } + onConnecting(serverAddress); startConnect(serverAddress); clientCnxnSocket.updateLastSendAndHeard(); } @@ -1204,7 +1220,7 @@ public void run() { zooKeeperSaslClient.initialize(ClientCnxn.this); } catch (SaslException e) { LOG.error("SASL authentication with Zookeeper Quorum member failed.", e); - state = States.AUTH_FAILED; + changeZkState(States.AUTH_FAILED); sendAuthEvent = true; } } @@ -1212,7 +1228,7 @@ public void run() { if (authState != null) { if (authState == KeeperState.AuthFailed) { // An authentication error occurred during authentication with the Zookeeper Server. - state = States.AUTH_FAILED; + changeZkState(States.AUTH_FAILED); sendAuthEvent = true; } else { if (authState == KeeperState.SaslAuthenticated) { @@ -1406,7 +1422,7 @@ void onConnected( boolean isRO) throws IOException { negotiatedSessionTimeout = _negotiatedSessionTimeout; if (negotiatedSessionTimeout <= 0) { - state = States.CLOSED; + changeZkState(States.CLOSED); eventThread.queueEvent(new WatchedEvent(Watcher.Event.EventType.None, Watcher.Event.KeeperState.Expired, null)); eventThread.queueEventOfDeath(); @@ -1427,7 +1443,7 @@ void onConnected( hostProvider.onConnected(); sessionId = _sessionId; sessionPasswd = _sessionPasswd; - state = (isRO) ? States.CONNECTEDREADONLY : States.CONNECTED; + changeZkState((isRO) ? States.CONNECTEDREADONLY : States.CONNECTED); seenRwServerBefore |= !isRO; LOG.info( "Session establishment complete on server {}, session id = 0x{}, negotiated timeout = {}{}", @@ -1440,7 +1456,12 @@ void onConnected( } void close() { - state = States.CLOSED; + try { + changeZkState(States.CLOSED); + } catch (IOException e) { + LOG.warn("Connection close fails when migrates state from {} to CLOSED", + getZkState()); + } clientCnxnSocket.onClosing(); } diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/ClientCnxnSocketFragilityTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/ClientCnxnSocketFragilityTest.java new file mode 100644 index 00000000000..07b7d622269 --- /dev/null +++ b/zookeeper-server/src/test/java/org/apache/zookeeper/ClientCnxnSocketFragilityTest.java @@ -0,0 +1,362 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zookeeper; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.util.Queue; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import org.apache.zookeeper.ClientCnxn.Packet; +import org.apache.zookeeper.Watcher.Event.KeeperState; +import org.apache.zookeeper.ZooDefs.Ids; +import org.apache.zookeeper.client.HostProvider; +import org.apache.zookeeper.client.ZKClientConfig; +import org.apache.zookeeper.data.Stat; +import org.apache.zookeeper.server.quorum.QuorumPeerTestBase; +import org.apache.zookeeper.test.ClientBase; +import org.junit.Assert; +import org.junit.Test; + +public class ClientCnxnSocketFragilityTest extends QuorumPeerTestBase { + + private static final int SERVER_COUNT = 3; + + private static final int SESSION_TIMEOUT = 40000; + + public static final int CONNECTION_TIMEOUT = 30000; + + private final UnsafeCoordinator unsafeCoordinator = new UnsafeCoordinator(); + + private volatile CustomZooKeeper zk = null; + + private volatile FragileClientCnxnSocketNIO socket = null; + + private volatile CustomClientCnxn cnxn = null; + + private String getCxnString(int[] clientPorts) { + StringBuffer hostPortBuffer = new StringBuffer(); + for (int i = 0; i < clientPorts.length; i++) { + hostPortBuffer.append("127.0.0.1:"); + hostPortBuffer.append(clientPorts[i]); + if (i != (clientPorts.length - 1)) { + hostPortBuffer.append(','); + } + } + return hostPortBuffer.toString(); + } + + private void closeZookeeper(ZooKeeper zk) { + Executors.newSingleThreadExecutor().submit(() -> { + try { + LOG.info("closeZookeeper is fired"); + zk.close(); + } catch (InterruptedException e) { + } + }); + } + + @Test + public void testClientCnxnSocketFragility() throws Exception { + System.setProperty(ZKClientConfig.ZOOKEEPER_CLIENT_CNXN_SOCKET, + FragileClientCnxnSocketNIO.class.getName()); + System.setProperty(ZKClientConfig.ZOOKEEPER_REQUEST_TIMEOUT, "1000"); + final int[] clientPorts = new int[SERVER_COUNT]; + StringBuilder sb = new StringBuilder(); + String server; + + for (int i = 0; i < SERVER_COUNT; i++) { + clientPorts[i] = PortAssignment.unique(); + server = "server." + i + "=127.0.0.1:" + PortAssignment.unique() + ":" + + PortAssignment.unique() + ":participant;127.0.0.1:" + clientPorts[i]; + sb.append(server + "\n"); + } + String currentQuorumCfgSection = sb.toString(); + MainThread[] mt = new MainThread[SERVER_COUNT]; + + for (int i = 0; i < SERVER_COUNT; i++) { + mt[i] = new MainThread(i, clientPorts[i], currentQuorumCfgSection, false); + mt[i].start(); + } + + // Ensure server started + for (int i = 0; i < SERVER_COUNT; i++) { + Assert.assertTrue("waiting for server " + i + " being up", + ClientBase.waitForServerUp("127.0.0.1:" + clientPorts[i], CONNECTION_TIMEOUT)); + } + String path = "/testClientCnxnSocketFragility"; + String data = "balabala"; + ClientWatcher watcher = new ClientWatcher(); + zk = new CustomZooKeeper(getCxnString(clientPorts), SESSION_TIMEOUT, watcher); + watcher.watchFor(zk); + + // Let's see some successful operations + zk.create(path, data.getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); + Assert.assertEquals(new String(zk.getData(path, false, new Stat())), data); + Assert.assertTrue(!watcher.isSessionExpired()); + + // Let's make a broken operation + socket.mute(); + boolean catchKeeperException = false; + try { + zk.getData(path, false, new Stat()); + } catch (KeeperException e) { + catchKeeperException = true; + Assert.assertFalse(e instanceof KeeperException.SessionExpiredException); + } + socket.unmute(); + Assert.assertTrue(catchKeeperException); + Assert.assertTrue(!watcher.isSessionExpired()); + + GetDataRetryForeverBackgroundTask retryForeverGetData = + new GetDataRetryForeverBackgroundTask(zk, path); + retryForeverGetData.startTask(); + // Let's make a broken network + socket.mute(); + + // Let's attempt to close ZooKeeper + cnxn.attemptClose(); + + // Wait some time to expect continuous reconnecting. + // We try to make reconnecting hit the unsafe region. + cnxn.waitUntilHitUnsafeRegion(); + + // close zk with timeout 1000 milli seconds + closeZookeeper(zk); + TimeUnit.MILLISECONDS.sleep(3000); + + // Since we already close zookeeper, we expect that the zk should not be alive. + Assert.assertTrue(!zk.isAlive()); + Assert.assertTrue(!watcher.isSessionExpired()); + + retryForeverGetData.syncCloseTask(); + for (int i = 0; i < SERVER_COUNT; i++) { + mt[i].shutdown(); + } + } + + class GetDataRetryForeverBackgroundTask extends Thread { + private volatile boolean alive; + private final CustomZooKeeper zk; + private final String path; + + GetDataRetryForeverBackgroundTask(CustomZooKeeper zk, String path) { + this.alive = false; + this.zk = zk; + this.path = path; + // marked as daemon to avoid exhausting CPU + setDaemon(true); + } + + void startTask() { + alive = true; + start(); + } + + void syncCloseTask() throws InterruptedException { + alive = false; + join(); + } + + @Override + public void run() { + while (alive) { + try { + zk.getData(path, false, new Stat()); + // sleep for a while to avoid exhausting CPU + TimeUnit.MILLISECONDS.sleep(500); + } catch (Exception e) { + LOG.info("zookeeper getData failed on path {}", path); + } + } + } + } + + public static class FragileClientCnxnSocketNIO extends ClientCnxnSocketNIO { + + private volatile boolean mute; + + public FragileClientCnxnSocketNIO(ZKClientConfig clientConfig) throws IOException { + super(clientConfig); + mute = false; + } + + synchronized void mute() { + if (!mute) { + LOG.info("Fire socket mute"); + mute = true; + } + } + + synchronized void unmute() { + if (mute) { + LOG.info("Fire socket unmute"); + mute = false; + } + } + + @Override + void doTransport(int waitTimeOut, Queue pendingQueue, ClientCnxn cnxn) + throws IOException, InterruptedException { + if (mute) { + throw new IOException("Socket is mute"); + } + super.doTransport(waitTimeOut, pendingQueue, cnxn); + } + + @Override + void connect(InetSocketAddress addr) throws IOException { + if (mute) { + throw new IOException("Socket is mute"); + } + super.connect(addr); + } + } + + class ClientWatcher implements Watcher { + + private ZooKeeper zk; + + private boolean sessionExpired = false; + + void watchFor(ZooKeeper zk) { + this.zk = zk; + } + + @Override + public void process(WatchedEvent event) { + LOG.info("Watcher got {}", event); + if (event.getState() == KeeperState.Expired) { + sessionExpired = true; + } + } + + boolean isSessionExpired() { + return sessionExpired; + } + } + + // Coordinate to construct the risky scenario. + class UnsafeCoordinator { + + private CountDownLatch syncLatch = new CountDownLatch(2); + + void sync(boolean closing) { + LOG.info("Attempt to sync with {}", closing); + if (closing) { + syncLatch.countDown(); + try { + syncLatch.await(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + } + } + + class CustomClientCnxn extends ClientCnxn { + + private volatile boolean closing = false; + + private volatile boolean hitUnsafeRegion = false; + + public CustomClientCnxn( + String chrootPath, + HostProvider hostProvider, + int sessionTimeout, + ZooKeeper zooKeeper, + ClientWatchManager watcher, + ClientCnxnSocket clientCnxnSocket, + boolean canBeReadOnly) throws IOException { + super(chrootPath, hostProvider, sessionTimeout, zooKeeper, watcher, clientCnxnSocket, canBeReadOnly); + } + + void attemptClose() { + closing = true; + } + + void waitUntilHitUnsafeRegion() { + while (!hitUnsafeRegion) { + try { + TimeUnit.MILLISECONDS.sleep(100); + } catch (InterruptedException e) { + } + } + } + + @Override + protected void onConnecting(InetSocketAddress addr) { + if (closing) { + LOG.info("Attempt to connnecting {} {} {}", addr, closing, state); + ///////// Unsafe Region //////// + // Slow down and zoom out the unsafe point to make risk + // The unsafe point is that startConnect happens after sendThread.close + hitUnsafeRegion = true; + unsafeCoordinator.sync(closing); + //////////////////////////////// + } + } + + @Override + public void disconnect() { + Assert.assertTrue(closing); + LOG.info("Attempt to disconnecting client for session: 0x{} {} {}", Long.toHexString(getSessionId()), closing, state); + sendThread.close(); + ///////// Unsafe Region //////// + unsafeCoordinator.sync(closing); + //////////////////////////////// + try { + sendThread.join(); + } catch (InterruptedException ex) { + LOG.warn("Got interrupted while waiting for the sender thread to close", ex); + } + eventThread.queueEventOfDeath(); + if (zooKeeperSaslClient != null) { + zooKeeperSaslClient.shutdown(); + } + } + } + + class CustomZooKeeper extends ZooKeeper { + + public CustomZooKeeper(String connectString, int sessionTimeout, Watcher watcher) throws IOException { + super(connectString, sessionTimeout, watcher); + } + + public boolean isAlive() { + return cnxn.getState().isAlive(); + } + + @Override + protected ClientCnxn createConnection( + String chrootPath, + HostProvider hostProvider, + int sessionTimeout, + ZooKeeper zooKeeper, + ClientWatchManager watcher, + ClientCnxnSocket clientCnxnSocket, + boolean canBeReadOnly) throws IOException { + Assert.assertTrue(clientCnxnSocket instanceof FragileClientCnxnSocketNIO); + socket = (FragileClientCnxnSocketNIO) clientCnxnSocket; + ClientCnxnSocketFragilityTest.this.cnxn = new CustomClientCnxn(chrootPath, hostProvider, sessionTimeout, zooKeeper, watcher, clientCnxnSocket, canBeReadOnly); + return ClientCnxnSocketFragilityTest.this.cnxn; + } + } +} \ No newline at end of file