Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ZOOKEEPER-3706: ZooKeeper.close() would leak SendThread when the netw… #1235

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 32 additions & 11 deletions zookeeper-server/src/main/java/org/apache/zookeeper/ClientCnxn.java
Original file line number Diff line number Diff line change
Expand Up @@ -417,6 +417,7 @@ public ClientCnxn(
* @param canBeReadOnly
* whether the connection is allowed to go to read-only
* mode in case of partitioning
* @throws IOException in cases of broken network
*/
public ClientCnxn(
String chrootPath,
Expand All @@ -427,7 +428,7 @@ public ClientCnxn(
ClientCnxnSocket clientCnxnSocket,
long sessionId,
byte[] sessionPasswd,
boolean canBeReadOnly) {
boolean canBeReadOnly) throws IOException {
this.zooKeeper = zooKeeper;
this.watcher = watcher;
this.sessionId = sessionId;
Expand Down Expand Up @@ -782,6 +783,11 @@ void queueCallback(AsyncCallback cb, int rc, String path, Object ctx) {
eventThread.queueCallback(cb, rc, path, ctx);
}

// for test only
protected void onConnecting(InetSocketAddress addr) {

}

yfxhust marked this conversation as resolved.
Show resolved Hide resolved
private void conLossPacket(Packet p) {
if (p.replyHeader == null) {
return;
Expand Down Expand Up @@ -876,7 +882,7 @@ void readResponse(ByteBuffer incomingBuffer) throws IOException {
case AUTHPACKET_XID:
LOG.debug("Got auth session id: 0x{}", Long.toHexString(sessionId));
if (replyHdr.getErr() == KeeperException.Code.AUTHFAILED.intValue()) {
state = States.AUTH_FAILED;
changeZkState(States.AUTH_FAILED);
eventThread.queueEvent(new WatchedEvent(Watcher.Event.EventType.None,
Watcher.Event.KeeperState.AuthFailed, null));
eventThread.queueEventOfDeath();
Expand Down Expand Up @@ -955,9 +961,9 @@ void readResponse(ByteBuffer incomingBuffer) throws IOException {
}
}

SendThread(ClientCnxnSocket clientCnxnSocket) {
SendThread(ClientCnxnSocket clientCnxnSocket) throws IOException {
super(makeThreadName("-SendThread()"));
state = States.CONNECTING;
changeZkState(States.CONNECTING);
this.clientCnxnSocket = clientCnxnSocket;
setDaemon(true);
}
Expand All @@ -971,10 +977,19 @@ void readResponse(ByteBuffer incomingBuffer) throws IOException {
*
* @return
*/
ZooKeeper.States getZkState() {
synchronized ZooKeeper.States getZkState() {
return state;
yfxhust marked this conversation as resolved.
Show resolved Hide resolved
}

synchronized void changeZkState(ZooKeeper.States newState) throws IOException {
if (!state.isAlive() && newState == States.CONNECTING) {
throw new IOException(
"Connection has already been closed and reconnection is not allowed");
}
// It's safer to place state modification at the end.
state = newState;
}

ClientCnxnSocket getClientCnxnSocket() {
return clientCnxnSocket;
}
Expand Down Expand Up @@ -1121,7 +1136,7 @@ private void startConnect(InetSocketAddress addr) throws IOException {
LOG.warn("Unexpected exception", e);
}
}
state = States.CONNECTING;
changeZkState(States.CONNECTING);

String hostPort = addr.getHostString() + ":" + addr.getPort();
MDC.put("myid", hostPort);
Expand Down Expand Up @@ -1179,6 +1194,7 @@ public void run() {
} else {
serverAddress = hostProvider.next(1000);
}
onConnecting(serverAddress);
startConnect(serverAddress);
yfxhust marked this conversation as resolved.
Show resolved Hide resolved
clientCnxnSocket.updateLastSendAndHeard();
}
Expand All @@ -1192,15 +1208,15 @@ public void run() {
zooKeeperSaslClient.initialize(ClientCnxn.this);
} catch (SaslException e) {
LOG.error("SASL authentication with Zookeeper Quorum member failed.", e);
state = States.AUTH_FAILED;
changeZkState(States.AUTH_FAILED);
sendAuthEvent = true;
}
}
KeeperState authState = zooKeeperSaslClient.getKeeperState();
if (authState != null) {
if (authState == KeeperState.AuthFailed) {
// An authentication error occurred during authentication with the Zookeeper Server.
state = States.AUTH_FAILED;
changeZkState(States.AUTH_FAILED);
sendAuthEvent = true;
} else {
if (authState == KeeperState.SaslAuthenticated) {
Expand Down Expand Up @@ -1394,7 +1410,7 @@ void onConnected(
boolean isRO) throws IOException {
negotiatedSessionTimeout = _negotiatedSessionTimeout;
if (negotiatedSessionTimeout <= 0) {
state = States.CLOSED;
changeZkState(States.CLOSED);

eventThread.queueEvent(new WatchedEvent(Watcher.Event.EventType.None, Watcher.Event.KeeperState.Expired, null));
eventThread.queueEventOfDeath();
Expand All @@ -1415,7 +1431,7 @@ void onConnected(
hostProvider.onConnected();
sessionId = _sessionId;
sessionPasswd = _sessionPasswd;
state = (isRO) ? States.CONNECTEDREADONLY : States.CONNECTED;
changeZkState((isRO) ? States.CONNECTEDREADONLY : States.CONNECTED);
seenRwServerBefore |= !isRO;
LOG.info(
"Session establishment complete on server {}, session id = 0x{}, negotiated timeout = {}{}",
Expand All @@ -1428,7 +1444,12 @@ void onConnected(
}

void close() {
state = States.CLOSED;
try {
changeZkState(States.CLOSED);
} catch (IOException e) {
yfxhust marked this conversation as resolved.
Show resolved Hide resolved
LOG.warn("Connection close fails when migrates state from {} to CLOSED",
getZkState());
}
clientCnxnSocket.onClosing();
}

Expand Down
Loading