Skip to content

Commit

Permalink
ZOOKEEPER-3072: Throttle race condition fix
Browse files Browse the repository at this point in the history
Making the throttle check before passing over the request to the next thread will prevent the possibility of throttling code running after unthrottle

Added an additional async hammer thread which is pretty reliably reproduces the race condition. The globalOutstandingLimit is decreased so throttling code is executed.

Author: Botond Hejj <[email protected]>

Reviewers: Andor Molnár <[email protected]>, Norbert Kalmar <[email protected]>, Benjamin Reed <[email protected]>

Closes #563 from bothejjms/ZOOKEEPER-3072

(cherry picked from commit 2a372fc)
Signed-off-by: Benjamin Reed <[email protected]>
  • Loading branch information
Botond Hejj authored and breed committed Jul 28, 2018
1 parent 7f51e5b commit a859410
Showing 1 changed file with 14 additions and 16 deletions.
30 changes: 14 additions & 16 deletions src/java/main/org/apache/zookeeper/server/ZooKeeperServer.java
Original file line number Diff line number Diff line change
Expand Up @@ -1114,24 +1114,22 @@ public void processPacket(ServerCnxn cnxn, ByteBuffer incomingBuffer) throws IOE
cnxn.disableRecv();
}
return;
} else if (h.getType() == OpCode.sasl) {
Record rsp = processSasl(incomingBuffer,cnxn);
ReplyHeader rh = new ReplyHeader(h.getXid(), 0, KeeperException.Code.OK.intValue());
cnxn.sendResponse(rh,rsp, "response"); // not sure about 3rd arg..what is it?
return;
} else {
if (h.getType() == OpCode.sasl) {
Record rsp = processSasl(incomingBuffer,cnxn);
ReplyHeader rh = new ReplyHeader(h.getXid(), 0, KeeperException.Code.OK.intValue());
cnxn.sendResponse(rh,rsp, "response"); // not sure about 3rd arg..what is it?
return;
}
else {
Request si = new Request(cnxn, cnxn.getSessionId(), h.getXid(),
h.getType(), incomingBuffer, cnxn.getAuthInfo());
si.setOwner(ServerCnxn.me);
// Always treat packet from the client as a possible
// local request.
setLocalSessionFlag(si);
submitRequest(si);
}
cnxn.incrOutstandingRequests(h);
Request si = new Request(cnxn, cnxn.getSessionId(), h.getXid(),
h.getType(), incomingBuffer, cnxn.getAuthInfo());
si.setOwner(ServerCnxn.me);
// Always treat packet from the client as a possible
// local request.
setLocalSessionFlag(si);
submitRequest(si);
return;
}
cnxn.incrOutstandingRequests(h);
}

private Record processSasl(ByteBuffer incomingBuffer, ServerCnxn cnxn) throws IOException {
Expand Down

0 comments on commit a859410

Please sign in to comment.