acls) {
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/Request.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/Request.java
index d0fb7da4d2e..43a68ac7aa2 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/Request.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/Request.java
@@ -100,6 +100,8 @@ public Request(long sessionId, int xid, int type, TxnHeader hdr, Record txn, lon
public long syncQueueStartTime;
+ public long requestThrottleQueueTime;
+
private Object owner;
private KeeperException e;
@@ -108,6 +110,22 @@ public Request(long sessionId, int xid, int type, TxnHeader hdr, Record txn, lon
private TxnDigest txnDigest;
+ private boolean isThrottledFlag = false;
+
+ public boolean isThrottled() {
+ return isThrottledFlag;
+ }
+
+ public void setIsThrottled(boolean val) {
+ isThrottledFlag = val;
+ }
+
+ public boolean isThrottlable() {
+ return this.type != OpCode.ping
+ && this.type != OpCode.closeSession
+ && this.type != OpCode.createSession;
+ }
+
/**
* If this is a create or close request for a local-only session.
*/
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/RequestThrottler.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/RequestThrottler.java
index 16a1c6f1517..e9cdc5ec10b 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/RequestThrottler.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/RequestThrottler.java
@@ -20,6 +20,7 @@
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import java.util.concurrent.LinkedBlockingQueue;
+import org.apache.zookeeper.common.Time;
import org.apache.zookeeper.util.ServiceUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -97,6 +98,13 @@ public class RequestThrottler extends ZooKeeperCriticalThread {
*/
private static volatile boolean dropStaleRequests = Boolean.parseBoolean(System.getProperty("zookeeper.request_throttle_drop_stale", "true"));
+ protected boolean shouldThrottleOp(Request request, long elapsedTime) {
+ return request.isThrottlable()
+ && zks.getThrottledOpWaitTime() > 0
+ && elapsedTime > zks.getThrottledOpWaitTime();
+ }
+
+
public RequestThrottler(ZooKeeperServer zks) {
super("RequestThrottler", zks.getZooKeeperServerListener());
this.zks = zks;
@@ -171,6 +179,12 @@ public void run() {
if (request.isStale()) {
ServerMetrics.getMetrics().STALE_REQUESTS.add(1);
}
+ final long elapsedTime = Time.currentElapsedTime() - request.requestThrottleQueueTime;
+ ServerMetrics.getMetrics().REQUEST_THROTTLE_QUEUE_TIME.add(elapsedTime);
+ if (shouldThrottleOp(request, elapsedTime)) {
+ request.setIsThrottled(true);
+ ServerMetrics.getMetrics().THROTTLED_OPS.add(1);
+ }
zks.submitRequestNow(request);
}
}
@@ -230,6 +244,7 @@ public void submitRequest(Request request) {
LOG.debug("Shutdown in progress. Request cannot be processed");
dropRequest(request);
} else {
+ request.requestThrottleQueueTime = Time.currentElapsedTime();
submittedRequests.add(request);
}
}
@@ -238,7 +253,6 @@ public int getInflight() {
return submittedRequests.size();
}
- @SuppressFBWarnings("DM_EXIT")
public void shutdown() {
// Try to shutdown gracefully
LOG.info("Shutting down");
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ServerMetrics.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ServerMetrics.java
index cbdb2347d6f..95214209c01 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ServerMetrics.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ServerMetrics.java
@@ -154,6 +154,8 @@ private ServerMetrics(MetricsProvider metricsProvider) {
READS_ISSUED_IN_COMMIT_PROC = metricsContext.getSummary("read_commit_proc_issued", DetailLevel.BASIC);
WRITES_ISSUED_IN_COMMIT_PROC = metricsContext.getSummary("write_commit_proc_issued", DetailLevel.BASIC);
+ THROTTLED_OPS = metricsContext.getCounter("throttled_ops");
+
/**
* Time spent by a read request in the commit processor.
*/
@@ -223,6 +225,7 @@ private ServerMetrics(MetricsProvider metricsProvider) {
STALE_REQUESTS = metricsContext.getCounter("stale_requests");
STALE_REQUESTS_DROPPED = metricsContext.getCounter("stale_requests_dropped");
STALE_REPLIES = metricsContext.getCounter("stale_replies");
+ REQUEST_THROTTLE_QUEUE_TIME = metricsContext.getSummary("request_throttle_queue_time_ms", DetailLevel.ADVANCED);
REQUEST_THROTTLE_WAIT_COUNT = metricsContext.getCounter("request_throttle_wait_count");
LARGE_REQUESTS_REJECTED = metricsContext.getCounter("large_requests_rejected");
@@ -381,6 +384,9 @@ private ServerMetrics(MetricsProvider metricsProvider) {
public final Summary READS_ISSUED_IN_COMMIT_PROC;
public final Summary WRITES_ISSUED_IN_COMMIT_PROC;
+ // Request op throttling related
+ public final Counter THROTTLED_OPS;
+
/**
* Time spent by a read request in the commit processor.
*/
@@ -435,6 +441,7 @@ private ServerMetrics(MetricsProvider metricsProvider) {
public final Counter STALE_REQUESTS;
public final Counter STALE_REQUESTS_DROPPED;
public final Counter STALE_REPLIES;
+ public final Summary REQUEST_THROTTLE_QUEUE_TIME;
public final Counter REQUEST_THROTTLE_WAIT_COUNT;
public final Counter LARGE_REQUESTS_REJECTED;
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/SyncRequestProcessor.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/SyncRequestProcessor.java
index 66e85b14a2d..4df319f86a4 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/SyncRequestProcessor.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/SyncRequestProcessor.java
@@ -178,7 +178,7 @@ public void run() {
ServerMetrics.getMetrics().SYNC_PROCESSOR_QUEUE_TIME.add(startProcessTime - si.syncQueueStartTime);
// track the number of records written to the log
- if (zks.getZKDatabase().append(si)) {
+ if (!si.isThrottled() && zks.getZKDatabase().append(si)) {
if (shouldSnapshot()) {
resetSnapshotStats();
// roll the log
@@ -202,9 +202,8 @@ public void run() {
}
} else if (toFlush.isEmpty()) {
// optimization for read heavy workloads
- // iff this is a read, and there are no pending
- // flushes (writes), then just pass this to the next
- // processor
+ // iff this is a read or a throttled request(which doesn't need to be written to the disk),
+ // and there are no pending flushes (writes), then just pass this to the next processor
if (nextProcessor != null) {
nextProcessor.processRequest(si);
if (nextProcessor instanceof Flushable) {
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java
index 1a2d9a7e06a..8205f741499 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java
@@ -155,6 +155,9 @@ public static void setCloseSessionTxnEnabled(boolean enabled) {
public static final int DEFAULT_TICK_TIME = 3000;
protected int tickTime = DEFAULT_TICK_TIME;
+ public static final int DEFAULT_THROTTLED_OP_WAIT_TIME = 0; // disabled
+ protected static volatile int throttledOpWaitTime =
+ Integer.getInteger("zookeeper.throttled_op_wait_time", DEFAULT_THROTTLED_OP_WAIT_TIME);
/** value of -1 indicates unset, use default */
protected int minSessionTimeout = -1;
/** value of -1 indicates unset, use default */
@@ -1237,6 +1240,15 @@ public void setTickTime(int tickTime) {
this.tickTime = tickTime;
}
+ public static int getThrottledOpWaitTime() {
+ return throttledOpWaitTime;
+ }
+
+ public static void setThrottledOpWaitTime(int time) {
+ LOG.info("throttledOpWaitTime set to {}", time);
+ throttledOpWaitTime = time;
+ }
+
public int getMinSessionTimeout() {
return minSessionTimeout;
}
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServerBean.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServerBean.java
index bd9b6432440..17dd48b7505 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServerBean.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServerBean.java
@@ -315,6 +315,16 @@ public void setFlushDelay(long delay) {
// Request throttling settings
///////////////////////////////////////////////////////////////////////////
+ public int getThrottledOpWaitTime() {
+ return zks.getThrottledOpWaitTime();
+ }
+
+ public void setThrottledOpWaitTime(int val) {
+ zks.setThrottledOpWaitTime(val);
+ }
+
+ ///////////////////////////////////////////////////////////////////////////
+
public int getRequestThrottleLimit() {
return RequestThrottler.getMaxRequests();
}
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServerMXBean.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServerMXBean.java
index 71a9d987e04..851fc564a09 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServerMXBean.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServerMXBean.java
@@ -136,6 +136,9 @@ public interface ZooKeeperServerMXBean {
boolean getRequestThrottleDropStale();
void setRequestThrottleDropStale(boolean drop);
+ int getThrottledOpWaitTime();
+ void setThrottledOpWaitTime(int val);
+
boolean getRequestStaleLatencyCheck();
void setRequestStaleLatencyCheck(boolean check);
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/CommitProcessor.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/CommitProcessor.java
index 01f9f0d2738..86dce2b7f85 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/CommitProcessor.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/CommitProcessor.java
@@ -29,12 +29,14 @@
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.zookeeper.ZooDefs.OpCode;
import org.apache.zookeeper.common.Time;
+import org.apache.zookeeper.server.ExitCode;
import org.apache.zookeeper.server.Request;
import org.apache.zookeeper.server.RequestProcessor;
import org.apache.zookeeper.server.ServerMetrics;
import org.apache.zookeeper.server.WorkerService;
import org.apache.zookeeper.server.ZooKeeperCriticalThread;
import org.apache.zookeeper.server.ZooKeeperServerListener;
+import org.apache.zookeeper.util.ServiceUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -165,6 +167,9 @@ private boolean isProcessingRequest() {
}
protected boolean needCommit(Request request) {
+ if (request.isThrottled()) {
+ return false;
+ }
switch (request.type) {
case OpCode.create:
case OpCode.create2:
@@ -306,6 +311,11 @@ public void run() {
// Process committed head
request = committedRequests.peek();
+ if (request.isThrottled()) {
+ LOG.error("Throttled request in committed pool: {}. Exiting.", request);
+ ServiceUtils.requestSystemExit(ExitCode.UNEXPECTED_ERROR.getValue());
+ }
+
/*
* Check if this is a local write request is pending,
* if so, update it with the committed info. If the commit matches
@@ -349,6 +359,10 @@ public void run() {
topPending.zxid = request.zxid;
topPending.commitRecvTime = request.commitRecvTime;
request = topPending;
+ if (request.isThrottled()) {
+ LOG.error("Throttled request in committed & pending pool: {}. Exiting.", request);
+ ServiceUtils.requestSystemExit(ExitCode.UNEXPECTED_ERROR.getValue());
+ }
// Only decrement if we take a request off the queue.
numWriteQueuedRequests.decrementAndGet();
queuedWriteRequests.poll();
@@ -452,7 +466,8 @@ public void start() {
*/
private void sendToNextProcessor(Request request) {
numRequestsProcessing.incrementAndGet();
- workerPool.schedule(new CommitWorkRequest(request), request.sessionId);
+ CommitWorkRequest workRequest = new CommitWorkRequest(request);
+ workerPool.schedule(workRequest, request.sessionId);
}
private void processWrite(Request request) throws RequestProcessorException {
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/FollowerRequestProcessor.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/FollowerRequestProcessor.java
index 88144de3602..db51aee495d 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/FollowerRequestProcessor.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/FollowerRequestProcessor.java
@@ -73,6 +73,9 @@ public void run() {
// the request to the leader so that we are ready to receive
// the response
nextProcessor.processRequest(request);
+ if (request.isThrottled()) {
+ continue;
+ }
// We now ship the request to the leader. As with all
// other quorum operations, sync also follows this code
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Leader.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Leader.java
index 5371bea91d9..0eb3722a47c 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Leader.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Leader.java
@@ -56,6 +56,7 @@
import org.apache.zookeeper.ZooDefs.OpCode;
import org.apache.zookeeper.common.Time;
import org.apache.zookeeper.jmx.MBeanRegistry;
+import org.apache.zookeeper.server.ExitCode;
import org.apache.zookeeper.server.FinalRequestProcessor;
import org.apache.zookeeper.server.Request;
import org.apache.zookeeper.server.RequestProcessor;
@@ -68,6 +69,7 @@
import org.apache.zookeeper.server.quorum.flexible.QuorumVerifier;
import org.apache.zookeeper.server.util.SerializeUtils;
import org.apache.zookeeper.server.util.ZxidUtils;
+import org.apache.zookeeper.util.ServiceUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -1218,6 +1220,10 @@ public XidRolloverException(String message) {
* @return the proposal that is queued to send to all the members
*/
public Proposal propose(Request request) throws XidRolloverException {
+ if (request.isThrottled()) {
+ LOG.error("Throttled request send as proposal: {}. Exiting.", request);
+ ServiceUtils.requestSystemExit(ExitCode.UNEXPECTED_ERROR.getValue());
+ }
/**
* Address the rollover issue. All lower 32bits set indicate a new leader
* election. Force a re-election instead. See ZOOKEEPER-1277
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Learner.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Learner.java
index 8a0cac17ad6..da5f1132ada 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Learner.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Learner.java
@@ -234,6 +234,10 @@ void readPacket(QuorumPacket pp) throws IOException {
* @throws IOException
*/
void request(Request request) throws IOException {
+ if (request.isThrottled()) {
+ LOG.error("Throttled request sent to leader: {}. Exiting", request);
+ ServiceUtils.requestSystemExit(ExitCode.UNEXPECTED_ERROR.getValue());
+ }
ByteArrayOutputStream baos = new ByteArrayOutputStream();
DataOutputStream oa = new DataOutputStream(baos);
oa.writeLong(request.sessionId);
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/ObserverRequestProcessor.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/ObserverRequestProcessor.java
index 46662032951..0075ce404a8 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/ObserverRequestProcessor.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/ObserverRequestProcessor.java
@@ -82,6 +82,10 @@ public void run() {
// the response
nextProcessor.processRequest(request);
+ if (request.isThrottled()) {
+ continue;
+ }
+
// We now ship the request to the leader. As with all
// other quorum operations, sync also follows this code
// path, but different from others, we need to keep track
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumZooKeeperServer.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumZooKeeperServer.java
index c6cd93b4d18..6fdea82751e 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumZooKeeperServer.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumZooKeeperServer.java
@@ -60,6 +60,10 @@ protected void startSessionTracker() {
}
public Request checkUpgradeSession(Request request) throws IOException, KeeperException {
+ if (request.isThrottled()) {
+ return null;
+ }
+
// If this is a request for a local session and it is to
// create an ephemeral node, then upgrade the session and return
// a new session request for the leader.
diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/test/QuorumBase.java b/zookeeper-server/src/test/java/org/apache/zookeeper/test/QuorumBase.java
index b91aa19e915..638275a5415 100644
--- a/zookeeper-server/src/test/java/org/apache/zookeeper/test/QuorumBase.java
+++ b/zookeeper-server/src/test/java/org/apache/zookeeper/test/QuorumBase.java
@@ -247,6 +247,66 @@ public int getLeaderIndex() {
return -1;
}
+ public int getLeaderClientPort() {
+ if (s1.getPeerState() == ServerState.LEADING) {
+ return portClient1;
+ } else if (s2.getPeerState() == ServerState.LEADING) {
+ return portClient2;
+ } else if (s3.getPeerState() == ServerState.LEADING) {
+ return portClient3;
+ } else if (s4.getPeerState() == ServerState.LEADING) {
+ return portClient4;
+ } else if (s5.getPeerState() == ServerState.LEADING) {
+ return portClient5;
+ }
+ return -1;
+ }
+
+ public QuorumPeer getLeaderQuorumPeer() {
+ if (s1.getPeerState() == ServerState.LEADING) {
+ return s1;
+ } else if (s2.getPeerState() == ServerState.LEADING) {
+ return s2;
+ } else if (s3.getPeerState() == ServerState.LEADING) {
+ return s3;
+ } else if (s4.getPeerState() == ServerState.LEADING) {
+ return s4;
+ } else if (s5.getPeerState() == ServerState.LEADING) {
+ return s5;
+ }
+ return null;
+ }
+
+ public QuorumPeer getFirstObserver() {
+ if (s1.getLearnerType() == LearnerType.OBSERVER) {
+ return s1;
+ } else if (s2.getLearnerType() == LearnerType.OBSERVER) {
+ return s2;
+ } else if (s3.getLearnerType() == LearnerType.OBSERVER) {
+ return s3;
+ } else if (s4.getLearnerType() == LearnerType.OBSERVER) {
+ return s4;
+ } else if (s5.getLearnerType() == LearnerType.OBSERVER) {
+ return s5;
+ }
+ return null;
+ }
+
+ public int getFirstObserverClientPort() {
+ if (s1.getLearnerType() == LearnerType.OBSERVER) {
+ return portClient1;
+ } else if (s2.getLearnerType() == LearnerType.OBSERVER) {
+ return portClient2;
+ } else if (s3.getLearnerType() == LearnerType.OBSERVER) {
+ return portClient3;
+ } else if (s4.getLearnerType() == LearnerType.OBSERVER) {
+ return portClient4;
+ } else if (s5.getLearnerType() == LearnerType.OBSERVER) {
+ return portClient5;
+ }
+ return -1;
+ }
+
public String getPeersMatching(ServerState state) {
StringBuilder hosts = new StringBuilder();
for (QuorumPeer p : getPeerList()) {
diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/test/ThrottledOpHelper.java b/zookeeper-server/src/test/java/org/apache/zookeeper/test/ThrottledOpHelper.java
new file mode 100644
index 00000000000..1d9e502257b
--- /dev/null
+++ b/zookeeper-server/src/test/java/org/apache/zookeeper/test/ThrottledOpHelper.java
@@ -0,0 +1,224 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import mockit.Mock;
+import mockit.MockUp;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.ZooDefs;
+import org.apache.zookeeper.ZooDefs.Ids;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.data.ACL;
+import org.apache.zookeeper.data.Stat;
+import org.apache.zookeeper.server.Request;
+import org.apache.zookeeper.server.RequestThrottler;
+import org.apache.zookeeper.server.ZooKeeperServer;
+import org.junit.Assert;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ThrottledOpHelper {
+ protected static final Logger LOG = LoggerFactory.getLogger(ThrottledOpHelper.class);
+
+ public static final class RequestThrottleMock extends MockUp {
+ public static void throttleEveryNthOp(int n) {
+ everyNthOp = n;
+ opCounter = 0;
+ }
+ private static int everyNthOp = 0;
+ private static int opCounter = 0;
+
+ @Mock
+ private boolean shouldThrottleOp(Request request, long elapsedTime) {
+ if (everyNthOp > 0 && request.isThrottlable() && (++opCounter % everyNthOp == 0)) {
+ opCounter %= everyNthOp;
+ return true;
+ }
+ return false;
+ }
+ }
+
+ public static void applyMockUps() {
+ new RequestThrottleMock();
+ }
+
+ public void testThrottledOp(ZooKeeper zk, ZooKeeperServer zs) throws IOException, InterruptedException, KeeperException {
+ final int N = 5; // must be greater than 3
+ final int COUNT = 100;
+ RequestThrottleMock.throttleEveryNthOp(N);
+ LOG.info("Before create /ivailo nodes");
+ int opCount = 0;
+ for (int i = 0; i < COUNT; i++) {
+ String nodeName = "/ivailo" + i;
+ if (opCount % N == N - 1) {
+ try {
+ zk.create(nodeName, "".getBytes(), Ids.OPEN_ACL_UNSAFE,
+ (i % 2 == 0) ? CreateMode.PERSISTENT : CreateMode.EPHEMERAL);
+ Assert.fail("Should have gotten ThrottledOp exception");
+ } catch (KeeperException.ThrottledOpException e) {
+ // anticipated outcome
+ Stat stat = zk.exists(nodeName, null);
+ Assert.assertNull(stat);
+ zk.create(nodeName, "".getBytes(), Ids.OPEN_ACL_UNSAFE,
+ (i % 2 == 0) ? CreateMode.PERSISTENT : CreateMode.EPHEMERAL);
+ } catch (KeeperException e) {
+ Assert.fail("Should have gotten ThrottledOp exception");
+ }
+ opCount += 3; // three ops issued
+ } else {
+ zk.create(nodeName, "".getBytes(), Ids.OPEN_ACL_UNSAFE,
+ (i % 2 == 0) ? CreateMode.PERSISTENT : CreateMode.EPHEMERAL);
+ opCount++; // one op issued
+ }
+ if (opCount % N == N - 1) {
+ try {
+ zk.setData(nodeName, nodeName.getBytes(), -1);
+ Assert.fail("Should have gotten ThrottledOp exception");
+ } catch (KeeperException.ThrottledOpException e) {
+ // anticipated outcome & retry
+ zk.setData(nodeName, nodeName.getBytes(), -1);
+ } catch (KeeperException e) {
+ Assert.fail("Should have gotten ThrottledOp exception");
+ }
+ opCount += 2; // two ops issued, one for retry
+ } else {
+ zk.setData(nodeName, nodeName.getBytes(), -1);
+ opCount++; // one op issued
+ }
+ }
+ LOG.info("Before delete /ivailo nodes");
+ for (int i = 0; i < COUNT; i++) {
+ String nodeName = "/ivailo" + i;
+ if (opCount % N == N - 1) {
+ try {
+ zk.exists(nodeName, null);
+ Assert.fail("Should have gotten ThrottledOp exception");
+ } catch (KeeperException.ThrottledOpException e) {
+ // anticipated outcome & retry
+ Stat stat = zk.exists(nodeName, null);
+ Assert.assertNotNull(stat);
+ opCount += 2; // two ops issued, one is retry
+ } catch (KeeperException e) {
+ Assert.fail("Should have gotten ThrottledOp exception");
+ }
+ } else {
+ Stat stat = zk.exists(nodeName, null);
+ Assert.assertNotNull(stat);
+ opCount++;
+ }
+ if (opCount % N == N - 1) {
+ try {
+ zk.getData(nodeName, null, null);
+ Assert.fail("Should have gotten ThrottledOp exception");
+ } catch (KeeperException.ThrottledOpException e) {
+ // anticipated outcome & retry
+ byte[] data = zk.getData(nodeName, null, null);
+ Assert.assertEquals(nodeName, new String(data));
+ opCount += 2; // two ops issued, one is retry
+ } catch (KeeperException e) {
+ Assert.fail("Should have gotten ThrottledOp exception");
+ }
+ } else {
+ byte[] data = zk.getData(nodeName, null, null);
+ Assert.assertEquals(nodeName, new String(data));
+ opCount++;
+ }
+ if (opCount % N == N - 1) {
+ try {
+ // version 0 should not trigger BadVersion exception
+ zk.delete(nodeName, 0);
+ Assert.fail("Should have gotten ThrottledOp exception");
+ } catch (KeeperException.ThrottledOpException e) {
+ // anticipated outcome & retry
+ zk.delete(nodeName, -1);
+ } catch (KeeperException e) {
+ Assert.fail("Should have gotten ThrottledOp exception");
+ }
+ opCount += 2; // two ops issues, one for retry
+ } else {
+ zk.delete(nodeName, -1);
+ opCount++; // one op only issued
+ }
+ if (opCount % N == N - 1) {
+ try {
+ zk.exists(nodeName, null);
+ Assert.fail("Should have gotten ThrottledOp exception");
+ } catch (KeeperException.ThrottledOpException e) {
+ // anticipated outcome & retry
+ Stat stat = zk.exists(nodeName, null);
+ Assert.assertNull(stat);
+ opCount += 2; // two ops issued, one is retry
+ } catch (KeeperException e) {
+ Assert.fail("Should have gotten ThrottledOp exception");
+ }
+ } else {
+ Stat stat = zk.exists(nodeName, null);
+ Assert.assertNull(stat);
+ opCount++;
+ }
+ }
+ LOG.info("After delete /ivailo");
+ zk.close();
+ }
+
+ public void testThrottledAcl(ZooKeeper zk, ZooKeeperServer zs) throws Exception {
+ RequestThrottleMock.throttleEveryNthOp(0);
+
+ final ArrayList ACL_PERMS =
+ new ArrayList() { {
+ add(new ACL(ZooDefs.Perms.READ, ZooDefs.Ids.ANYONE_ID_UNSAFE));
+ add(new ACL(ZooDefs.Perms.WRITE, ZooDefs.Ids.ANYONE_ID_UNSAFE));
+ add(new ACL(ZooDefs.Perms.ALL, ZooDefs.Ids.AUTH_IDS));
+ }};
+ String path = "/path1";
+ zk.create(path, path.getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+ zk.addAuthInfo("digest", "pat:test".getBytes());
+ List defaultAcls = zk.getACL(path, null);
+ Assert.assertEquals(1, defaultAcls.size());
+
+ RequestThrottleMock.throttleEveryNthOp(2);
+
+ path = "/path2";
+ zk.create(path, path.getBytes(), Ids.OPEN_ACL_UNSAFE,
+ CreateMode.PERSISTENT);
+ try {
+ zk.setACL(path, ACL_PERMS, -1);
+ Assert.fail("Should have gotten ThrottledOp exception");
+ } catch (KeeperException.ThrottledOpException e) {
+ // expected
+ } catch (KeeperException e) {
+ Assert.fail("Should have gotten ThrottledOp exception");
+ }
+ List acls = zk.getACL(path, null);
+ Assert.assertEquals(1, acls.size());
+
+ RequestThrottleMock.throttleEveryNthOp(0);
+
+ path = "/path3";
+ zk.create(path, path.getBytes(), Ids.OPEN_ACL_UNSAFE,
+ CreateMode.PERSISTENT);
+ zk.setACL(path, ACL_PERMS, -1);
+ acls = zk.getACL(path, null);
+ Assert.assertEquals(3, acls.size());
+ }
+}
diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/test/ThrottledOpObserverTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/test/ThrottledOpObserverTest.java
new file mode 100644
index 00000000000..dfd19642f08
--- /dev/null
+++ b/zookeeper-server/src/test/java/org/apache/zookeeper/test/ThrottledOpObserverTest.java
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.test;
+
+import java.io.IOException;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.server.ZooKeeperServer;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class ThrottledOpObserverTest extends QuorumBase {
+ @BeforeClass
+ public static void applyMockUps() {
+ ThrottledOpHelper.applyMockUps();
+ }
+
+ @Override
+ public void setUp() throws Exception {
+ super.setUp(true /* withObservers */);
+ }
+
+ @Override
+ public void tearDown() throws Exception {
+ super.tearDown();
+ }
+
+ @Test
+ public void testThrottledOpObserver() throws IOException, InterruptedException, KeeperException {
+ ZooKeeper zk = null;
+ try {
+ zk = createClient("localhost:" + getFirstObserverClientPort());
+ ZooKeeperServer zs = getFirstObserver().getActiveServer();
+ ThrottledOpHelper test = new ThrottledOpHelper();
+ test.testThrottledOp(zk, zs);
+ } finally {
+ if (zk != null) {
+ zk.close();
+ }
+ }
+ }
+
+ @Test
+ public void testThrottledAclObserver() throws Exception {
+ ZooKeeper zk = null;
+ try {
+ zk = createClient("localhost:" + getFirstObserverClientPort());
+ ZooKeeperServer zs = getFirstObserver().getActiveServer();
+ ThrottledOpHelper test = new ThrottledOpHelper();
+ test.testThrottledAcl(zk, zs);
+ } finally {
+ if (zk != null) {
+ zk.close();
+ }
+ }
+ }
+}
diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/test/ThrottledOpQuorumTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/test/ThrottledOpQuorumTest.java
new file mode 100644
index 00000000000..27613657da4
--- /dev/null
+++ b/zookeeper-server/src/test/java/org/apache/zookeeper/test/ThrottledOpQuorumTest.java
@@ -0,0 +1,98 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.test;
+
+import java.io.IOException;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.server.ZooKeeperServer;
+import org.apache.zookeeper.server.quorum.QuorumPeer;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class ThrottledOpQuorumTest extends QuorumBase {
+ @BeforeClass
+ public static void applyMockUps() {
+ ThrottledOpHelper.applyMockUps();
+ }
+
+ @Test
+ public void testThrottledOpLeader() throws IOException, InterruptedException, KeeperException {
+ ZooKeeper zk = null;
+ try {
+ zk = createClient("localhost:" + getLeaderClientPort());
+ ZooKeeperServer zs = getLeaderQuorumPeer().getActiveServer();
+ ThrottledOpHelper test = new ThrottledOpHelper();
+ test.testThrottledOp(zk, zs);
+ } finally {
+ if (zk != null) {
+ zk.close();
+ }
+ }
+ }
+
+ @Test
+ public void testThrottledAclLeader() throws Exception {
+ ZooKeeper zk = null;
+ try {
+ zk = createClient("localhost:" + getLeaderClientPort());
+ ZooKeeperServer zs = getLeaderQuorumPeer().getActiveServer();
+ ThrottledOpHelper test = new ThrottledOpHelper();
+ test.testThrottledAcl(zk, zs);
+ } finally {
+ if (zk != null) {
+ zk.close();
+ }
+ }
+ }
+
+ @Test
+ public void testThrottledOpFollower() throws IOException, InterruptedException, KeeperException {
+ ZooKeeper zk = null;
+ try {
+ int clientPort = (getLeaderClientPort() == portClient1) ? portClient2 : portClient1;
+ zk = createClient("localhost:" + clientPort);
+ QuorumPeer qp = (getLeaderClientPort() == portClient1) ? s2 : s1;
+ ZooKeeperServer zs = qp.getActiveServer();
+ ThrottledOpHelper test = new ThrottledOpHelper();
+ test.testThrottledOp(zk, zs);
+ } finally {
+ if (zk != null) {
+ zk.close();
+ }
+ }
+ }
+
+ @Test
+ public void testThrottledAclFollower() throws Exception {
+ ZooKeeper zk = null;
+ try {
+ int clientPort = (getLeaderClientPort() == portClient1) ? portClient2 : portClient1;
+ zk = createClient("localhost:" + clientPort);
+ QuorumPeer qp = (getLeaderClientPort() == portClient1) ? s2 : s1;
+ ZooKeeperServer zs = qp.getActiveServer();
+ ThrottledOpHelper test = new ThrottledOpHelper();
+ test.testThrottledAcl(zk, zs);
+ } finally {
+ if (zk != null) {
+ zk.close();
+ }
+ }
+ }
+}
diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/test/ThrottledOpStandaloneTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/test/ThrottledOpStandaloneTest.java
new file mode 100644
index 00000000000..27ac6a6939c
--- /dev/null
+++ b/zookeeper-server/src/test/java/org/apache/zookeeper/test/ThrottledOpStandaloneTest.java
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.test;
+
+import java.io.IOException;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.server.ZooKeeperServer;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class ThrottledOpStandaloneTest extends ClientBase {
+
+ @BeforeClass
+ public static void applyMockUps() {
+ ThrottledOpHelper.applyMockUps();
+ }
+
+ @Test
+ public void testThrottledOp() throws IOException, InterruptedException, KeeperException {
+ ZooKeeper zk = null;
+ try {
+ zk = createClient(hostPort);
+ ZooKeeperServer zs = serverFactory.getZooKeeperServer();
+ ThrottledOpHelper test = new ThrottledOpHelper();
+ test.testThrottledOp(zk, zs);
+ } finally {
+ if (zk != null) {
+ zk.close();
+ }
+ }
+ }
+
+ @Test
+ public void testThrottledAcl() throws Exception {
+ ZooKeeper zk = null;
+ try {
+ zk = createClient(hostPort);
+ ZooKeeperServer zs = serverFactory.getZooKeeperServer();
+ ThrottledOpHelper test = new ThrottledOpHelper();
+ test.testThrottledAcl(zk, zs);
+ } finally {
+ if (zk != null) {
+ zk.close();
+ }
+ }
+ }
+}