Skip to content

Commit

Permalink
ZOOKEEPER-3706: ZooKeeper.close() would leak SendThread when the netw…
Browse files Browse the repository at this point in the history
…ork is broken

- add unit test to verify the bug
- bypass the SendThread.startConnect() by throw RuntimeExcepth if state.isAlive is false

Author: Fangxi Yin <yinfangxikuaishou.com>

Author: yinfangxi <[email protected]>

Reviewers: Michael Han <[email protected]>, Enrico Olivelli <[email protected]>, maoling

Closes #1235 from yfxhust/ZOOKEEPER-3706
  • Loading branch information
yinfangxi authored and Mate Szalay-Beko committed May 17, 2022
1 parent aa20b0e commit af44dab
Show file tree
Hide file tree
Showing 2 changed files with 403 additions and 25 deletions.
69 changes: 44 additions & 25 deletions zookeeper-server/src/main/java/org/apache/zookeeper/ClientCnxn.java
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ static class AuthData {
* operation)
*/
private volatile boolean closing = false;

/**
* A set of ZooKeeper hosts this client could connect to.
*/
Expand Down Expand Up @@ -379,11 +379,11 @@ public ClientCnxn(String chrootPath, HostProvider hostProvider, int sessionTimeo
* @param canBeReadOnly
* whether the connection is allowed to go to read-only
* mode in case of partitioning
* @throws IOException
* @throws IOException in cases of broken network
*/
public ClientCnxn(String chrootPath, HostProvider hostProvider, int sessionTimeout, ZooKeeper zooKeeper,
ClientWatchManager watcher, ClientCnxnSocket clientCnxnSocket,
long sessionId, byte[] sessionPasswd, boolean canBeReadOnly) {
long sessionId, byte[] sessionPasswd, boolean canBeReadOnly) throws IOException {
this.zooKeeper = zooKeeper;
this.watcher = watcher;
this.sessionId = sessionId;
Expand Down Expand Up @@ -649,7 +649,7 @@ private void processEvent(Object event) {
.substring(chrootPath.length())), rsp.getStat());
} else {
cb.processResult(rc, clientPath, p.ctx, null, null);
}
}
} else if (p.response instanceof MultiResponse) {
MultiCallback cb = (MultiCallback) p.cb;
MultiResponse rsp = (MultiResponse) p.response;
Expand Down Expand Up @@ -735,6 +735,11 @@ void queueCallback(AsyncCallback cb, int rc, String path, Object ctx) {
eventThread.queueCallback(cb, rc, path, ctx);
}

// for test only
protected void onConnecting(InetSocketAddress addr) {

}

private void conLossPacket(Packet p) {
if (p.replyHeader == null) {
return;
Expand Down Expand Up @@ -764,7 +769,7 @@ static class EndOfStreamException extends IOException {
public EndOfStreamException(String msg) {
super(msg);
}

@Override
public String toString() {
return "EndOfStreamException: " + getMessage();
Expand All @@ -778,7 +783,7 @@ public SessionTimeoutException(String msg) {
super(msg);
}
}

private static class SessionExpiredException extends IOException {
private static final long serialVersionUID = -1388816932076193249L;

Expand Down Expand Up @@ -826,10 +831,10 @@ void readResponse(ByteBuffer incomingBuffer) throws IOException {
return;
}
if (replyHdr.getXid() == -4) {
// -4 is the xid for AuthPacket
// -4 is the xid for AuthPacket
if(replyHdr.getErr() == KeeperException.Code.AUTHFAILED.intValue()) {
state = States.AUTH_FAILED;
eventThread.queueEvent( new WatchedEvent(Watcher.Event.EventType.None,
changeZkState(States.AUTH_FAILED);
eventThread.queueEvent( new WatchedEvent(Watcher.Event.EventType.None,
Watcher.Event.KeeperState.AuthFailed, null) );
eventThread.queueEventOfDeath();
}
Expand Down Expand Up @@ -927,9 +932,9 @@ else if (serverPath.length() > chrootPath.length())
}
}

SendThread(ClientCnxnSocket clientCnxnSocket) {
SendThread(ClientCnxnSocket clientCnxnSocket) throws IOException {
super(makeThreadName("-SendThread()"));
state = States.CONNECTING;
changeZkState(States.CONNECTING);
this.clientCnxnSocket = clientCnxnSocket;
setDaemon(true);
}
Expand All @@ -940,13 +945,22 @@ else if (serverPath.length() > chrootPath.length())
// Runnable
/**
* Used by ClientCnxnSocket
*
*
* @return
*/
ZooKeeper.States getZkState() {
synchronized ZooKeeper.States getZkState() {
return state;
}

synchronized void changeZkState(ZooKeeper.States newState) throws IOException {
if (!state.isAlive() && newState == States.CONNECTING) {
throw new IOException(
"Connection has already been closed and reconnection is not allowed");
}
// It's safer to place state modification at the end.
state = newState;
}

ClientCnxnSocket getClientCnxnSocket() {
return clientCnxnSocket;
}
Expand Down Expand Up @@ -1073,7 +1087,7 @@ private void startConnect(InetSocketAddress addr) throws IOException {
LOG.warn("Unexpected exception", e);
}
}
state = States.CONNECTING;
changeZkState(States.CONNECTING);

String hostPort = addr.getHostString() + ":" + addr.getPort();
MDC.put("myid", hostPort);
Expand Down Expand Up @@ -1136,6 +1150,7 @@ public void run() {
} else {
serverAddress = hostProvider.next(1000);
}
onConnecting(serverAddress);
startConnect(serverAddress);
// Update now to start the connection timer right after we make a connection attempt
clientCnxnSocket.updateNow();
Expand All @@ -1150,16 +1165,16 @@ public void run() {
try {
zooKeeperSaslClient.initialize(ClientCnxn.this);
} catch (SaslException e) {
LOG.error("SASL authentication with Zookeeper Quorum member failed: " + e);
state = States.AUTH_FAILED;
LOG.error("SASL authentication with Zookeeper Quorum member failed.", e);
changeZkState(States.AUTH_FAILED);
sendAuthEvent = true;
}
}
KeeperState authState = zooKeeperSaslClient.getKeeperState();
if (authState != null) {
if (authState == KeeperState.AuthFailed) {
// An authentication error occurred during authentication with the Zookeeper Server.
state = States.AUTH_FAILED;
changeZkState(States.AUTH_FAILED);
sendAuthEvent = true;
} else {
if (authState == KeeperState.SaslAuthenticated) {
Expand All @@ -1181,7 +1196,7 @@ public void run() {
} else {
to = connectTimeout - clientCnxnSocket.getIdleRecv();
}

if (to <= 0) {
String warnInfo;
warnInfo = "Client session timed out, have not heard from server in "
Expand All @@ -1194,8 +1209,8 @@ public void run() {
}
if (state.isConnected()) {
//1000(1 second) is to prevent race condition missing to send the second ping
//also make sure not to send too many pings when readTimeout is small
int timeToNextPing = readTimeout / 2 - clientCnxnSocket.getIdleSend() -
//also make sure not to send too many pings when readTimeout is small
int timeToNextPing = readTimeout / 2 - clientCnxnSocket.getIdleSend() -
((clientCnxnSocket.getIdleSend() > 1000) ? 1000 : 0);
//send a ping request either time is due or no packet sent out within MAX_SEND_PING_INTERVAL
if (timeToNextPing <= 0 || clientCnxnSocket.getIdleSend() > MAX_SEND_PING_INTERVAL) {
Expand Down Expand Up @@ -1363,7 +1378,7 @@ private void cleanup() {
/**
* Callback invoked by the ClientCnxnSocket once a connection has been
* established.
*
*
* @param _negotiatedSessionTimeout
* @param _sessionId
* @param _sessionPasswd
Expand All @@ -1374,7 +1389,7 @@ void onConnected(int _negotiatedSessionTimeout, long _sessionId,
byte[] _sessionPasswd, boolean isRO) throws IOException {
negotiatedSessionTimeout = _negotiatedSessionTimeout;
if (negotiatedSessionTimeout <= 0) {
state = States.CLOSED;
changeZkState(States.CLOSED);

eventThread.queueEvent(new WatchedEvent(
Watcher.Event.EventType.None,
Expand All @@ -1395,8 +1410,7 @@ void onConnected(int _negotiatedSessionTimeout, long _sessionId,
hostProvider.onConnected();
sessionId = _sessionId;
sessionPasswd = _sessionPasswd;
state = (isRO) ?
States.CONNECTEDREADONLY : States.CONNECTED;
changeZkState((isRO) ? States.CONNECTEDREADONLY : States.CONNECTED);
seenRwServerBefore |= !isRO;
LOG.info("Session establishment complete on server "
+ clientCnxnSocket.getRemoteSocketAddress()
Expand All @@ -1411,7 +1425,12 @@ void onConnected(int _negotiatedSessionTimeout, long _sessionId,
}

void close() {
state = States.CLOSED;
try {
changeZkState(States.CLOSED);
} catch (IOException e) {
LOG.warn("Connection close fails when migrates state from {} to CLOSED",
getZkState());
}
clientCnxnSocket.onClosing();
}

Expand Down
Loading

0 comments on commit af44dab

Please sign in to comment.