Skip to content

Commit

Permalink
ZOOKEEPER-3706: ZooKeeper.close() would leak SendThread when the netw…
Browse files Browse the repository at this point in the history
…ork is broken

- add unit test to verify the bug
- bypass the SendThread.startConnect() by throw RuntimeExcepth if state.isAlive is false

Author: Fangxi Yin <[email protected]>
  • Loading branch information
yinfangxi committed Jan 25, 2020
1 parent 3bd6b19 commit 30c07d2
Show file tree
Hide file tree
Showing 2 changed files with 318 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -782,6 +782,10 @@ void queueCallback(AsyncCallback cb, int rc, String path, Object ctx) {
eventThread.queueCallback(cb, rc, path, ctx);
}

protected void onConnecting(InetSocketAddress addr) {
LOG.info("Connecting server {}.", addr);
}

private void conLossPacket(Packet p) {
if (p.replyHeader == null) {
return;
Expand Down Expand Up @@ -1121,6 +1125,9 @@ private void startConnect(InetSocketAddress addr) throws IOException {
LOG.warn("Unexpected exception", e);
}
}
if (!state.isAlive()) {
throw new RuntimeException("Already closed");
}
state = States.CONNECTING;

String hostPort = addr.getHostString() + ":" + addr.getPort();
Expand Down Expand Up @@ -1179,6 +1186,7 @@ public void run() {
} else {
serverAddress = hostProvider.next(1000);
}
ClientCnxn.this.onConnecting(serverAddress);
startConnect(serverAddress);
clientCnxnSocket.updateLastSendAndHeard();
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,310 @@
package org.apache.zookeeper;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.Queue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.zookeeper.ClientCnxn.Packet;
import org.apache.zookeeper.Watcher.Event.KeeperState;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.ZooDefs.OpCode;
import org.apache.zookeeper.client.HostProvider;
import org.apache.zookeeper.client.ZKClientConfig;
import org.apache.zookeeper.data.Stat;
import org.apache.zookeeper.proto.RequestHeader;
import org.apache.zookeeper.server.quorum.QuorumPeerTestBase;
import org.apache.zookeeper.test.ClientBase;
import org.junit.Assert;
import org.junit.Test;

public class ClientCnxnSocketFragilityTest extends QuorumPeerTestBase {

private static final int SERVER_COUNT = 3;

private static final int SESSION_TIMEOUT = 40000;

public static final int CONNECTION_TIMEOUT = 30000;

private final ScheduledExecutorService scheduledExecutor = Executors.newSingleThreadScheduledExecutor();

private final UnsafeCoordinator unsafeCoordinator = new UnsafeCoordinator();

private volatile CustomZooKeeper zk = null;

private volatile FragileClientCnxnSocketNIO socket = null;

private volatile CustomClientCnxn cnxn = null;

private String getCxnString(int[] clientPorts) {
StringBuffer hostPortBuffer = new StringBuffer();
for (int i = 0; i < clientPorts.length; i++) {
hostPortBuffer.append("127.0.0.1:");
hostPortBuffer.append(clientPorts[i]);
if (i != (clientPorts.length - 1)) {
hostPortBuffer.append(',');
}
}
return hostPortBuffer.toString();
}

private void closeZookeeper(ZooKeeper zk) {
Executors.newSingleThreadExecutor().submit(() -> {
try {
LOG.info("closeZookeeper is fired");
zk.close();
} catch (InterruptedException e) {
}
});
}

private void getDataBackgroundRetryForever(CustomZooKeeper zk, String path) {
new Thread(() -> {
for (;;) {
try {
zk.getData(path, false, new Stat());
} catch (Exception e) {
}
}
}).start();
}

@Test
public void testClientCnxnSocketFragility() throws Exception {
System.setProperty(ZKClientConfig.ZOOKEEPER_CLIENT_CNXN_SOCKET, FragileClientCnxnSocketNIO.class.getName());
System.setProperty(ZKClientConfig.ZOOKEEPER_REQUEST_TIMEOUT, "1000");
final int[] clientPorts = new int[SERVER_COUNT];
StringBuilder sb = new StringBuilder();
String server;

for (int i = 0; i < SERVER_COUNT; i++) {
clientPorts[i] = PortAssignment.unique();
server = "server." + i + "=127.0.0.1:" + PortAssignment.unique() + ":" + PortAssignment.unique()
+ ":participant;127.0.0.1:" + clientPorts[i];
sb.append(server + "\n");
}
String currentQuorumCfgSection = sb.toString();
MainThread[] mt = new MainThread[SERVER_COUNT];

for (int i = 0; i < SERVER_COUNT; i++) {
mt[i] = new MainThread(i, clientPorts[i], currentQuorumCfgSection, false);
mt[i].start();
}

// Ensure server started
for (int i = 0; i < SERVER_COUNT; i++) {
Assert.assertTrue(
"waiting for server " + i + " being up",
ClientBase.waitForServerUp("127.0.0.1:" + clientPorts[i], CONNECTION_TIMEOUT));
}
String path = "/testClientCnxnSocketFragility";
String data = "balabala";
ClientWatcher watcher = new ClientWatcher();
zk = new CustomZooKeeper(getCxnString(clientPorts), SESSION_TIMEOUT, watcher);
watcher.watchFor(zk);

// Let's see some successful operations
zk.create(path, data.getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
Assert.assertEquals(new String(zk.getData(path, false, new Stat())), data);
Assert.assertTrue(!watcher.isSessionExpired());

// Let's make a broken operation
socket.mute();
boolean catchKeeperException = false;
try {
zk.getData(path, false, new Stat());
} catch (KeeperException e) {
catchKeeperException = true;
Assert.assertFalse(e instanceof KeeperException.SessionExpiredException);
}
socket.unmute();
Assert.assertTrue(catchKeeperException);
Assert.assertTrue(!watcher.isSessionExpired());

getDataBackgroundRetryForever(zk, path);
// Let's make a broken network
socket.mute();

// Let's attempt to close ZooKeeper
cnxn.attemptClose();

// Wait some time to expect continuous reconnecting.
// We try to make reconnecting hit the unsafe point.
TimeUnit.MILLISECONDS.sleep(3000);

// close zk with timeout 1000 milli seconds
closeZookeeper(zk);
TimeUnit.MILLISECONDS.sleep(3000);

// Since we already close zookeeper, we expect that the zk should not be alive.
Assert.assertTrue(!zk.isAlive());
Assert.assertTrue(!watcher.isSessionExpired());

for (int i = 0; i < SERVER_COUNT; i++) {
mt[i].shutdown();
}
}

public static class FragileClientCnxnSocketNIO extends ClientCnxnSocketNIO {

private volatile boolean mute;

public FragileClientCnxnSocketNIO(ZKClientConfig clientConfig) throws IOException {
super(clientConfig);
mute = false;
}

synchronized void mute() {
if (!mute) {
LOG.info("Fire socket mute");
mute = true;
}
}

synchronized void unmute() {
if (mute) {
LOG.info("Fire socket unmute");
mute = false;
}
}

@Override
void doTransport(int waitTimeOut, Queue<Packet> pendingQueue, ClientCnxn cnxn)
throws IOException, InterruptedException {
if (mute) {
throw new IOException("Socket is mute");
}
super.doTransport(waitTimeOut, pendingQueue, cnxn);
}

@Override
void connect(InetSocketAddress addr) throws IOException {
if (mute) {
throw new IOException("Socket is mute");
}
super.connect(addr);
}
}

class ClientWatcher implements Watcher {

private ZooKeeper zk;

private boolean sessionExpired = false;

void watchFor(ZooKeeper zk) {
this.zk = zk;
}

@Override
public void process(WatchedEvent event) {
LOG.info("Watcher got {}", event);
if (event.getState() == KeeperState.Expired) {
sessionExpired = true;
}
}

boolean isSessionExpired() {
return sessionExpired;
}
}

// Coordinate to construct the risky scenario.
class UnsafeCoordinator {

private CountDownLatch syncLatch = new CountDownLatch(2);

void sync(boolean closing) {
LOG.info("Attempt to sync with {}", closing);
if (closing) {
syncLatch.countDown();
try {
syncLatch.await();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}
}

class CustomClientCnxn extends ClientCnxn {

private volatile boolean closing = false;

public CustomClientCnxn(
String chrootPath,
HostProvider hostProvider,
int sessionTimeout,
ZooKeeper zooKeeper,
ClientWatchManager watcher,
ClientCnxnSocket clientCnxnSocket,
boolean canBeReadOnly) throws IOException {
super(chrootPath, hostProvider, sessionTimeout, zooKeeper, watcher, clientCnxnSocket, canBeReadOnly);
}

void attemptClose() {
closing = true;
}

@Override
protected void onConnecting(InetSocketAddress addr) {
if (closing) {
LOG.info("Attempt to connnecting {} {} {}", addr, closing, state);
///////// Unsafe Region ////////
// Slow down and zoom out the unsafe point to make risk
// The unsafe point is that startConnect happens after sendThread.close
unsafeCoordinator.sync(closing);
////////////////////////////////
}
}

@Override
public void disconnect() {
Assert.assertTrue(closing);
LOG.info("Attempt to disconnecting client for session: 0x{} {} {}", Long.toHexString(getSessionId()), closing, state);
sendThread.close();
///////// Unsafe Region ////////
unsafeCoordinator.sync(closing);
////////////////////////////////
try {
sendThread.join();
} catch (InterruptedException ex) {
LOG.warn("Got interrupted while waiting for the sender thread to close", ex);
}
eventThread.queueEventOfDeath();
if (zooKeeperSaslClient != null) {
zooKeeperSaslClient.shutdown();
}
}
}

class CustomZooKeeper extends ZooKeeper {

public CustomZooKeeper(String connectString, int sessionTimeout, Watcher watcher) throws IOException {
super(connectString, sessionTimeout, watcher);
}

public boolean isAlive() {
return cnxn.getState().isAlive();
}

@Override
protected ClientCnxn createConnection(
String chrootPath,
HostProvider hostProvider,
int sessionTimeout,
ZooKeeper zooKeeper,
ClientWatchManager watcher,
ClientCnxnSocket clientCnxnSocket,
boolean canBeReadOnly) throws IOException {
Assert.assertTrue(clientCnxnSocket instanceof FragileClientCnxnSocketNIO);
socket = (FragileClientCnxnSocketNIO) clientCnxnSocket;
ClientCnxnSocketFragilityTest.this.cnxn = new CustomClientCnxn(chrootPath, hostProvider, sessionTimeout, zooKeeper, watcher, clientCnxnSocket, canBeReadOnly);
return ClientCnxnSocketFragilityTest.this.cnxn;
}
}
}

0 comments on commit 30c07d2

Please sign in to comment.