From 65be7f6ff3291bca090253e3315f47166c2c6e69 Mon Sep 17 00:00:00 2001 From: Botond Hejj Date: Fri, 27 Jul 2018 19:37:08 -0700 Subject: [PATCH] ZOOKEEPER-3072: Throttle race condition fix MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Making the throttle check before passing over the request to the next thread will prevent the possibility of throttling code running after unthrottle Added an additional async hammer thread which is pretty reliably reproduces the race condition. The globalOutstandingLimit is decreased so throttling code is executed. Author: Botond Hejj Reviewers: Andor Molnár , Norbert Kalmar , Benjamin Reed Closes #563 from bothejjms/ZOOKEEPER-3072 --- .../zookeeper/server/ZooKeeperServer.java | 30 +++++++++---------- 1 file changed, 14 insertions(+), 16 deletions(-) diff --git a/src/java/main/org/apache/zookeeper/server/ZooKeeperServer.java b/src/java/main/org/apache/zookeeper/server/ZooKeeperServer.java index b0e2d64dfb0..9596908cdff 100644 --- a/src/java/main/org/apache/zookeeper/server/ZooKeeperServer.java +++ b/src/java/main/org/apache/zookeeper/server/ZooKeeperServer.java @@ -1130,24 +1130,22 @@ public void processPacket(ServerCnxn cnxn, ByteBuffer incomingBuffer) throws IOE cnxn.disableRecv(); } return; + } else if (h.getType() == OpCode.sasl) { + Record rsp = processSasl(incomingBuffer,cnxn); + ReplyHeader rh = new ReplyHeader(h.getXid(), 0, KeeperException.Code.OK.intValue()); + cnxn.sendResponse(rh,rsp, "response"); // not sure about 3rd arg..what is it? + return; } else { - if (h.getType() == OpCode.sasl) { - Record rsp = processSasl(incomingBuffer,cnxn); - ReplyHeader rh = new ReplyHeader(h.getXid(), 0, KeeperException.Code.OK.intValue()); - cnxn.sendResponse(rh,rsp, "response"); // not sure about 3rd arg..what is it? - return; - } - else { - Request si = new Request(cnxn, cnxn.getSessionId(), h.getXid(), - h.getType(), incomingBuffer, cnxn.getAuthInfo()); - si.setOwner(ServerCnxn.me); - // Always treat packet from the client as a possible - // local request. - setLocalSessionFlag(si); - submitRequest(si); - } + cnxn.incrOutstandingRequests(h); + Request si = new Request(cnxn, cnxn.getSessionId(), h.getXid(), + h.getType(), incomingBuffer, cnxn.getAuthInfo()); + si.setOwner(ServerCnxn.me); + // Always treat packet from the client as a possible + // local request. + setLocalSessionFlag(si); + submitRequest(si); + return; } - cnxn.incrOutstandingRequests(h); } private Record processSasl(ByteBuffer incomingBuffer, ServerCnxn cnxn) throws IOException {