From a796cad7e6d643434652bd5da69aa1635b01049c Mon Sep 17 00:00:00 2001 From: Botond Hejj Date: Fri, 6 Jul 2018 10:43:18 -0400 Subject: [PATCH] ZOOKEEPER-3072: Throttle race condition fix Making the throttle check before passing over the request to the next thread will prevent the possibility of throttling code running after unthrottle --- .../zookeeper/server/ZooKeeperServer.java | 3 +- .../zookeeper/test/ThrottleRaceTest.java | 156 ++++++++++++++++++ 2 files changed, 157 insertions(+), 2 deletions(-) create mode 100644 src/java/test/org/apache/zookeeper/test/ThrottleRaceTest.java diff --git a/src/java/main/org/apache/zookeeper/server/ZooKeeperServer.java b/src/java/main/org/apache/zookeeper/server/ZooKeeperServer.java index ff5b3b66a90..cfd77a961fa 100644 --- a/src/java/main/org/apache/zookeeper/server/ZooKeeperServer.java +++ b/src/java/main/org/apache/zookeeper/server/ZooKeeperServer.java @@ -1128,9 +1128,9 @@ public void processPacket(ServerCnxn cnxn, ByteBuffer incomingBuffer) throws IOE Record rsp = processSasl(incomingBuffer,cnxn); ReplyHeader rh = new ReplyHeader(h.getXid(), 0, KeeperException.Code.OK.intValue()); cnxn.sendResponse(rh,rsp, "response"); // not sure about 3rd arg..what is it? - return; } else { + cnxn.incrOutstandingRequests(h); Request si = new Request(cnxn, cnxn.getSessionId(), h.getXid(), h.getType(), incomingBuffer, cnxn.getAuthInfo()); si.setOwner(ServerCnxn.me); @@ -1140,7 +1140,6 @@ public void processPacket(ServerCnxn cnxn, ByteBuffer incomingBuffer) throws IOE submitRequest(si); } } - cnxn.incrOutstandingRequests(h); } private Record processSasl(ByteBuffer incomingBuffer, ServerCnxn cnxn) throws IOException { diff --git a/src/java/test/org/apache/zookeeper/test/ThrottleRaceTest.java b/src/java/test/org/apache/zookeeper/test/ThrottleRaceTest.java new file mode 100644 index 00000000000..396f8fb2f36 --- /dev/null +++ b/src/java/test/org/apache/zookeeper/test/ThrottleRaceTest.java @@ -0,0 +1,156 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zookeeper.test; + +import static org.apache.zookeeper.test.ClientBase.CONNECTION_TIMEOUT; +import static org.apache.zookeeper.test.ClientBase.verifyThreadTerminated; + +import org.apache.zookeeper.AsyncCallback.StatCallback; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.ZKTestCase; +import org.apache.zookeeper.TestableZooKeeper; +import org.apache.zookeeper.WatchedEvent; +import org.apache.zookeeper.data.Stat; +import org.apache.zookeeper.test.ClientBase.CountdownWatcher; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class ThrottleRaceTest extends ZKTestCase { + private static final Logger LOG = LoggerFactory.getLogger(ThrottleRaceTest.class); + + private QuorumBase qb = new QuorumBase(); + + private volatile boolean bang; + + public void setUp() throws Exception { + qb.setUp(); + } + + public void tearDown() throws Exception { + LOG.info("Test clients shutting down"); + qb.tearDown(); + } + + /** + * Send exists /zookeeper requests asynchronously, max 30 outstanding + */ + class HammerThreadExists extends Thread implements StatCallback { + private static final int MAX_OUTSTANDING = 30; + + private TestableZooKeeper zk; + private int outstanding; + + private volatile boolean failed = false; + + public HammerThreadExists(String name) { + super(name); + } + + public void run() { + try { + CountdownWatcher watcher = new CountdownWatcher(); + zk = new TestableZooKeeper(qb.hostPort, CONNECTION_TIMEOUT, + watcher); + watcher.waitForConnected(CONNECTION_TIMEOUT); + while(bang) { + incOutstanding(); // before create otw race + zk.exists("/zookeeper", false, this, null); + } + } catch (InterruptedException e) { + if (bang) { + LOG.error("sanity check Assert.failed!!!"); // sanity check + return; + } + } catch (Exception e) { + LOG.error("Client create operation Assert.failed", e); + return; + } finally { + if (zk != null) { + try { + if (!zk.close(CONNECTION_TIMEOUT)) { + failed = true; + LOG.error("Client did not shutdown"); + } + } catch (InterruptedException e) { + LOG.info("Interrupted", e); + } + } + } + } + + private synchronized void incOutstanding() throws InterruptedException { + outstanding++; + while(outstanding > MAX_OUTSTANDING) { + wait(); + } + } + + private synchronized void decOutstanding() { + outstanding--; + Assert.assertTrue("outstanding >= 0", outstanding >= 0); + notifyAll(); + } + + public void process(WatchedEvent event) { + // ignore for purposes of this test + } + + public void processResult(int rc, String path, Object ctx, Stat stat) { + if (rc != KeeperException.Code.OK.intValue()) { + if (bang) { + failed = true; + LOG.error("Exists Assert.failed for 0x" + + Long.toHexString(zk.getSessionId()) + + "with rc:" + rc + " path:" + path); + } + decOutstanding(); + return; + } + decOutstanding(); + } + } + + @Test + public void testExistsHammer() throws Exception { + System.setProperty("zookeeper.globalOutstandingLimit", "1"); + setUp(); + bang = true; + LOG.info("Starting hammers"); + HammerThreadExists[] hammers = new HammerThreadExists[100]; + for (int i = 0; i < hammers.length; i++) { + hammers[i] = new HammerThreadExists("HammerThread-" + i); + hammers[i].start(); + } + LOG.info("Started hammers"); + Thread.sleep(30000); // allow the clients to run for max 5sec + bang = false; + LOG.info("Stopping hammers"); + for (int i = 0; i < hammers.length; i++) { + hammers[i].interrupt(); + verifyThreadTerminated(hammers[i], 60000); + Assert.assertFalse(hammers[i].failed); + } + System.setProperty("zookeeper.globalOutstandingLimit", "1000"); + tearDown(); + } +}