From 65b292ccf364c3da966451140a0f4927e9c4388b Mon Sep 17 00:00:00 2001 From: Jie Huang Date: Wed, 23 Jan 2019 14:00:13 +0100 Subject: [PATCH] ZOOKEEPER-3242: Add server side connecting throttling Author: Jie Huang Reviewers: fangmin@apache.org, andor@apache.org Closes #769 from jhuan31/ZOOKEEPER-3242 and squashes the following commits: c3ec81f4e [Jie Huang] refactoring 86cad39c4 [Jie Huang] Use a mock random number generator to make the unit test flaky-proof a278504d4 [Jie Huang] Add unit tests for server-side connection throttling fd966502b [Jie Huang] update doc for server-side connection throttling 2f1ed0b87 [Jie Huang] Fix FindBugs Warnings a48b0fcb1 [Jie Huang] ZOOKEEPER-3242: Add server side connecting throttling --- .../main/resources/markdown/zookeeperAdmin.md | 68 +++++ .../apache/zookeeper/server/BlueThrottle.java | 268 ++++++++++++++++++ .../server/ClientCnxnLimitException.java | 30 ++ .../zookeeper/server/NIOServerCnxn.java | 12 +- .../server/NIOServerCnxnFactory.java | 1 + .../zookeeper/server/NettyServerCnxn.java | 7 + .../server/NettyServerCnxnFactory.java | 1 + .../zookeeper/server/ServerMetrics.java | 4 + .../zookeeper/server/ZooKeeperServer.java | 24 +- .../zookeeper/server/ZooKeeperServerBean.java | 73 ++++- .../server/ZooKeeperServerMXBean.java | 22 ++ .../zookeeper/server/admin/Commands.java | 1 + .../zookeeper/server/BlueThrottleTest.java | 157 ++++++++++ .../zookeeper/server/admin/CommandsTest.java | 3 +- 14 files changed, 666 insertions(+), 5 deletions(-) create mode 100644 zookeeper-server/src/main/java/org/apache/zookeeper/server/BlueThrottle.java create mode 100644 zookeeper-server/src/main/java/org/apache/zookeeper/server/ClientCnxnLimitException.java create mode 100644 zookeeper-server/src/test/java/org/apache/zookeeper/server/BlueThrottleTest.java diff --git a/zookeeper-docs/src/main/resources/markdown/zookeeperAdmin.md b/zookeeper-docs/src/main/resources/markdown/zookeeperAdmin.md index 95787b743a5..57df547b20f 100644 --- a/zookeeper-docs/src/main/resources/markdown/zookeeperAdmin.md +++ b/zookeeper-docs/src/main/resources/markdown/zookeeperAdmin.md @@ -738,6 +738,74 @@ property, when available, is noted below. strategy from the configured minimum (fastleader.minNotificationInterval) and the configured maximum (this) for long elections. +* *connectionMaxTokens* : + (Java system property: **zookeeper.connection_throttle_tokens**) + **New in 3.6.0:** + This is one of the parameters to tune the server-side connection throttler, + which is a token-based rate limiting mechanism with optional probabilistic + dropping. + This parameter defines the maximum number of tokens in the token-bucket. + When set to 0, throttling is disabled. Default is 0. + +* *connectionTokenFillTime* : + (Java system property: **zookeeper.connection_throttle_fill_time**) + **New in 3.6.0:** + This is one of the parameters to tune the server-side connection throttler, + which is a token-based rate limiting mechanism with optional probabilistic + dropping. + This parameter defines the interval in milliseconds when the token bucket is re-filled with + *connectionTokenFillCount* tokens. Default is 1. + +* *connectionTokenFillCount* : + (Java system property: **zookeeper.connection_throttle_fill_count**) + **New in 3.6.0:** + This is one of the parameters to tune the server-side connection throttler, + which is a token-based rate limiting mechanism with optional probabilistic + dropping. + This parameter defines the number of tokens to add to the token bucket every + *connectionTokenFillTime* milliseconds. Default is 1. + +* *connectionFreezeTime* : + (Java system property: **zookeeper.connection_throttle_freeze_time**) + **New in 3.6.0:** + This is one of the parameters to tune the server-side connection throttler, + which is a token-based rate limiting mechanism with optional probabilistic + dropping. + This parameter defines the interval in milliseconds when the dropping + probability is adjusted. When set to -1, probabilistic dropping is disabled. + Default is -1. + +* *connectionDropIncrease* : + (Java system property: **zookeeper.connection_throttle_drop_increase**) + **New in 3.6.0:** + This is one of the parameters to tune the server-side connection throttler, + which is a token-based rate limiting mechanism with optional probabilistic + dropping. + This parameter defines the dropping probability to increase. The throttler + checks every *connectionFreezeTime* milliseconds and if the token bucket is + empty, the dropping probability will be increased by *connectionDropIncrease*. + The default is 0.02. + +* *connectionDropDecrease* : + (Java system property: **zookeeper.connection_throttle_drop_decrease**) + **New in 3.6.0:** + This is one of the parameters to tune the server-side connection throttler, + which is a token-based rate limiting mechanism with optional probabilistic + dropping. + This parameter defines the dropping probability to decrease. The throttler + checks every *connectionFreezeTime* milliseconds and if the token bucket has + more tokens than a threshold, the dropping probability will be decreased by + *connectionDropDecrease*. The threshold is *connectionMaxTokens* \* + *connectionDecreaseRatio*. The default is 0.002. + +* *connectionDecreaseRatio* : + (Java system property: **zookeeper.connection_throttle_decrease_ratio**) + **New in 3.6.0:** + This is one of the parameters to tune the server-side connection throttler, + which is a token-based rate limiting mechanism with optional probabilistic + dropping. This parameter defines the threshold to decrease the dropping + probability. The default is 0. + #### Cluster Options diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/BlueThrottle.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/BlueThrottle.java new file mode 100644 index 00000000000..1aa9e5b838d --- /dev/null +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/BlueThrottle.java @@ -0,0 +1,268 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zookeeper.server; + +import java.util.Random; + +import org.apache.zookeeper.common.Time; + +/** + * Implements a token-bucket based rate limiting mechanism with optional + * probabilistic dropping inspired by the BLUE queue management algorithm [1]. + * + * The throttle provides the {@link #checkLimit(int)} method which provides + * a binary yes/no decision. + * + * The core token bucket algorithm starts with an initial set of tokens based + * on the maxTokens setting. Tokens are dispensed each + * {@link #checkLimit(int)} call, which fails if there are not enough tokens to + * satisfy a given request. + * + * The token bucket refills over time, providing fillCount tokens + * every fillTime milliseconds, capping at maxTokens. + * + * This design allows the throttle to allow short bursts to pass, while still + * capping the total number of requests per time interval. + * + * One issue with a pure token bucket approach for something like request or + * connection throttling is that the wall clock arrival time of requests affects + * the probability of a request being allowed to pass or not. Under constant + * load this can lead to request starvation for requests that constantly arrive + * later than the majority. + * + * In an attempt to combat this, this throttle can also provide probabilistic + * dropping. This is enabled anytime freezeTime is set to a value + * other than -1. + * + * The probabilistic algorithm starts with an initial drop probability of 0, and + * adjusts this probability roughly every freezeTime milliseconds. + * The first request after freezeTime, the algorithm checks the + * token bucket. If the token bucket is empty, the drop probability is increased + * by dropIncrease up to a maximum of 1. Otherwise, if + * the bucket has a token deficit less than decreasePoint * maxTokens, + * the probability is decreased by dropDecrease. + * + * Given a call to {@link #checkLimit(int)}, requests are first dropped randomly + * based on the current drop probability, and only surviving requests are then + * checked against the token bucket. + * + * When under constant load, the probabilistic algorithm will adapt to a drop + * frequency that should keep requests within the token limit. When load drops, + * the drop probability will decrease, eventually returning to zero if possible. + * + * [1] "BLUE: A New Class of Active Queue Management Algorithms" + **/ + +public class BlueThrottle { + private int maxTokens; + private int fillTime; + private int fillCount; + private int tokens; + private long lastTime; + + private int freezeTime; + private long lastFreeze; + private double dropIncrease; + private double dropDecrease; + private double decreasePoint; + private double drop; + + Random rng; + + public static final String CONNECTION_THROTTLE_TOKENS = "zookeeper.connection_throttle_tokens"; + public static final int DEFAULT_CONNECTION_THROTTLE_TOKENS; + + public static final String CONNECTION_THROTTLE_FILL_TIME = "zookeeper.connection_throttle_fill_time"; + public static final int DEFAULT_CONNECTION_THROTTLE_FILL_TIME; + + public static final String CONNECTION_THROTTLE_FILL_COUNT = "zookeeper.connection_throttle_fill_count"; + public static final int DEFAULT_CONNECTION_THROTTLE_FILL_COUNT; + + public static final String CONNECTION_THROTTLE_FREEZE_TIME = "zookeeper.connection_throttle_freeze_time"; + public static final int DEFAULT_CONNECTION_THROTTLE_FREEZE_TIME; + + public static final String CONNECTION_THROTTLE_DROP_INCREASE = "zookeeper.connection_throttle_drop_increase"; + public static final double DEFAULT_CONNECTION_THROTTLE_DROP_INCREASE; + + public static final String CONNECTION_THROTTLE_DROP_DECREASE = "zookeeper.connection_throttle_drop_decrease"; + public static final double DEFAULT_CONNECTION_THROTTLE_DROP_DECREASE; + + public static final String CONNECTION_THROTTLE_DECREASE_RATIO = "zookeeper.connection_throttle_decrease_ratio"; + public static final double DEFAULT_CONNECTION_THROTTLE_DECREASE_RATIO; + + + static { + DEFAULT_CONNECTION_THROTTLE_TOKENS = Integer.getInteger(CONNECTION_THROTTLE_TOKENS, 0); + DEFAULT_CONNECTION_THROTTLE_FILL_TIME = Integer.getInteger(CONNECTION_THROTTLE_FILL_TIME, 1); + DEFAULT_CONNECTION_THROTTLE_FILL_COUNT = Integer.getInteger(CONNECTION_THROTTLE_FILL_COUNT, 1); + + DEFAULT_CONNECTION_THROTTLE_FREEZE_TIME = Integer.getInteger(CONNECTION_THROTTLE_FREEZE_TIME, -1); + DEFAULT_CONNECTION_THROTTLE_DROP_INCREASE = getDoubleProp(CONNECTION_THROTTLE_DROP_INCREASE, 0.02); + DEFAULT_CONNECTION_THROTTLE_DROP_DECREASE = getDoubleProp(CONNECTION_THROTTLE_DROP_DECREASE, 0.002); + DEFAULT_CONNECTION_THROTTLE_DECREASE_RATIO = getDoubleProp(CONNECTION_THROTTLE_DECREASE_RATIO, 0); + } + + /* Varation of Integer.getInteger for real number properties */ + private static double getDoubleProp(String name, double def) { + String val = System.getProperty(name); + if(val != null) { + return Double.parseDouble(val); + } + else { + return def; + } + } + + + public BlueThrottle() { + // Disable throttling by default (maxTokens = 0) + this.maxTokens = DEFAULT_CONNECTION_THROTTLE_TOKENS; + this.fillTime = DEFAULT_CONNECTION_THROTTLE_FILL_TIME; + this.fillCount = DEFAULT_CONNECTION_THROTTLE_FILL_COUNT; + this.tokens = maxTokens; + this.lastTime = Time.currentElapsedTime(); + + // Disable BLUE throttling by default (freezeTime = -1) + this.freezeTime = DEFAULT_CONNECTION_THROTTLE_FREEZE_TIME; + this.lastFreeze = Time.currentElapsedTime(); + this.dropIncrease = DEFAULT_CONNECTION_THROTTLE_DROP_INCREASE; + this.dropDecrease = DEFAULT_CONNECTION_THROTTLE_DROP_DECREASE; + this.decreasePoint = DEFAULT_CONNECTION_THROTTLE_DECREASE_RATIO; + this.drop = 0; + + this.rng = new Random(); + } + + public synchronized void setMaxTokens(int max) { + int deficit = maxTokens - tokens; + maxTokens = max; + tokens = max - deficit; + } + + public synchronized void setFillTime(int time) { + fillTime = time; + } + + public synchronized void setFillCount(int count) { + fillCount = count; + } + + public synchronized void setFreezeTime(int time) { + freezeTime = time; + } + + public synchronized void setDropIncrease(double increase) { + dropIncrease = increase; + } + + public synchronized void setDropDecrease(double decrease) { + dropDecrease = decrease; + } + + public synchronized void setDecreasePoint(double ratio) { + decreasePoint = ratio; + } + + public synchronized int getMaxTokens() { + return maxTokens; + } + + public synchronized int getFillTime() { + return fillTime; + } + + public synchronized int getFillCount() { + return fillCount; + } + + public synchronized int getFreezeTime() { + return freezeTime; + } + + public synchronized double getDropIncrease() { + return dropIncrease; + } + + public synchronized double getDropDecrease() { + return dropDecrease; + } + + public synchronized double getDecreasePoint() { + return decreasePoint; + } + + public synchronized double getDropChance() { + return drop; + } + + public synchronized int getDeficit() { + return maxTokens - tokens; + } + + public synchronized boolean checkLimit(int need) { + // A maxTokens setting of zero disables throttling + if (maxTokens == 0) + return true; + + long now = Time.currentElapsedTime(); + long diff = now - lastTime; + + if (diff > fillTime) { + int refill = (int)(diff * fillCount / fillTime); + tokens = Math.min(tokens + refill, maxTokens); + lastTime = now; + } + + // A freeze time of -1 disables BLUE randomized throttling + if(freezeTime != -1) { + if(!checkBlue(now)) { + return false; + } + } + + if (tokens < need) { + return false; + } + + tokens -= need; + return true; + } + + public synchronized boolean checkBlue(long now) { + int length = maxTokens - tokens; + int limit = maxTokens; + long diff = now - lastFreeze; + long threshold = Math.round(maxTokens * decreasePoint); + + if (diff > freezeTime) { + if((length == limit) && (drop < 1)) { + drop = Math.min(drop + dropIncrease, 1); + } + else if ((length <= threshold) && (drop > 0)) { + drop = Math.max(drop - dropDecrease, 0); + } + lastFreeze = now; + } + + if (rng.nextDouble() < drop) { + return false; + } + return true; + } +}; diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ClientCnxnLimitException.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ClientCnxnLimitException.java new file mode 100644 index 00000000000..38f89959a5e --- /dev/null +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ClientCnxnLimitException.java @@ -0,0 +1,30 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zookeeper.server; + +/** + * Indicates that the number of client connections has exceeded some limit. + * @see org.apache.zookeeper.server.ClientCnxnLimit#checkLimit() + * @see org.apache.zookeeper.server.ClientCnxnLimit#checkLimit(int) + */ +public class ClientCnxnLimitException extends Exception { + public ClientCnxnLimitException() { + super("Connection throttle rejected connection"); + } +} diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/NIOServerCnxn.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/NIOServerCnxn.java index c2ab78487a8..f7e382facc4 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/NIOServerCnxn.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/NIOServerCnxn.java @@ -151,7 +151,7 @@ public void sendBuffer(ByteBuffer... buffers) { } /** Read the request payload (everything following the length prefix) */ - private void readPayload() throws IOException, InterruptedException { + private void readPayload() throws IOException, InterruptedException, ClientCnxnLimitException { if (incomingBuffer.remaining() != 0) { // have we read length bytes? int rc = sock.read(incomingBuffer); // sock is non-blocking, so ok if (rc < 0) { @@ -360,6 +360,14 @@ void doIO(SelectionKey k) throws InterruptedException { LOG.warn(e.getMessage()); // expecting close to log session closure close(); + } catch (ClientCnxnLimitException e) { + // Common case exception, print at debug level + ServerMetrics.CONNECTION_REJECTED.add(1); + if (LOG.isDebugEnabled()) { + LOG.debug("Exception causing close of session 0x" + + Long.toHexString(sessionId) + ": " + e.getMessage()); + } + close(); } catch (IOException e) { LOG.warn("Exception causing close of session 0x" + Long.toHexString(sessionId) + ": " + e.getMessage()); @@ -407,7 +415,7 @@ public void enableRecv() { } } - private void readConnectRequest() throws IOException, InterruptedException { + private void readConnectRequest() throws IOException, InterruptedException, ClientCnxnLimitException { if (!isZKServerRunning()) { throw new IOException("ZooKeeperServer not running"); } diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/NIOServerCnxnFactory.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/NIOServerCnxnFactory.java index 4e7e5db45f8..090ee7b2c71 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/NIOServerCnxnFactory.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/NIOServerCnxnFactory.java @@ -310,6 +310,7 @@ private boolean doAccept() { acceptErrorLogger.flush(); } catch (IOException e) { // accept, maxClientCnxns, configureBlocking + ServerMetrics.CONNECTION_REJECTED.add(1); acceptErrorLogger.rateLimitLog( "Error accepting new connection: " + e.getMessage()); fastCloseSock(sc); diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/NettyServerCnxn.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/NettyServerCnxn.java index b6bb343f49d..8b4f70f7156 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/NettyServerCnxn.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/NettyServerCnxn.java @@ -486,6 +486,13 @@ private void receiveMessage(ByteBuf message) { } catch(IOException e) { LOG.warn("Closing connection to " + getRemoteSocketAddress(), e); close(); + } catch(ClientCnxnLimitException e) { + // Common case exception, print at debug level + ServerMetrics.CONNECTION_REJECTED.add(1); + if (LOG.isDebugEnabled()) { + LOG.debug("Closing connection to " + getRemoteSocketAddress(), e); + } + close(); } } diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/NettyServerCnxnFactory.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/NettyServerCnxnFactory.java index d96f56de9f0..e0d55a4fbc1 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/NettyServerCnxnFactory.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/NettyServerCnxnFactory.java @@ -108,6 +108,7 @@ public void channelActive(ChannelHandlerContext ctx) throws Exception { InetAddress addr = ((InetSocketAddress) channel.remoteAddress()) .getAddress(); if (maxClientCnxns > 0 && getClientCnxnCount(addr) >= maxClientCnxns) { + ServerMetrics.CONNECTION_REJECTED.add(1); LOG.warn("Too many connections from {} - max is {}", addr, maxClientCnxns); channel.close(); diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ServerMetrics.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ServerMetrics.java index 3420b88e8b8..dcc04a3344c 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ServerMetrics.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ServerMetrics.java @@ -67,6 +67,10 @@ public enum ServerMetrics { SNAP_COUNT(new SimpleCounter("snap_count")), COMMIT_COUNT(new SimpleCounter("commit_count")), CONNECTION_REQUEST_COUNT(new SimpleCounter("connection_request_count")), + // Connection throttling related + CONNECTION_TOKEN_DEFICIT(new AvgMinMaxCounter("connection_token_deficit")), + CONNECTION_REJECTED(new SimpleCounter("connection_rejected")), + BYTES_RECEIVED_COUNT(new SimpleCounter("bytes_received_count")), RESPONSE_PACKET_CACHE_HITS(new SimpleCounter("response_packet_cache_hits")), diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java index 833c79bab0d..ee0e4c2cb69 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java @@ -164,6 +164,9 @@ protected enum State { LOG.info(INT_BUFFER_STARTING_SIZE_BYTES + " = " + intBufferStartingSizeBytes); } + // Connection throttling + private BlueThrottle connThrottle; + void removeCnxn(ServerCnxn cnxn) { zkDb.removeCnxn(cnxn); } @@ -196,7 +199,11 @@ public ZooKeeperServer(FileTxnSnapLog txnLogFactory, int tickTime, setMinSessionTimeout(minSessionTimeout); setMaxSessionTimeout(maxSessionTimeout); listener = new ZooKeeperServerListenerImpl(this); + readResponseCache = new ResponseCache(); + + connThrottle = new BlueThrottle(); + LOG.info("Created server with tickTime " + tickTime + " minSessionTimeout " + getMinSessionTimeout() + " maxSessionTimeout " + getMaxSessionTimeout() @@ -219,6 +226,10 @@ public ServerStats serverStats() { return serverStats; } + public BlueThrottle connThrottle() { + return connThrottle; + } + public void dumpConf(PrintWriter pwriter) { pwriter.print("clientPort="); pwriter.println(getClientPort()); @@ -1043,7 +1054,18 @@ public Map> getEphemerals() { return zkDb.getEphemerals(); } - public void processConnectRequest(ServerCnxn cnxn, ByteBuffer incomingBuffer) throws IOException { + public double getConnectionDropChance() { + return connThrottle.getDropChance(); + } + + public void processConnectRequest(ServerCnxn cnxn, ByteBuffer incomingBuffer) + throws IOException, ClientCnxnLimitException { + + if (connThrottle.checkLimit(1) == false) { + throw new ClientCnxnLimitException(); + } + ServerMetrics.CONNECTION_TOKEN_DEFICIT.add(connThrottle.getDeficit()); + BinaryInputArchive bia = BinaryInputArchive.getArchive(new ByteBufferInputStream(incomingBuffer)); ConnectRequest connReq = new ConnectRequest(); connReq.deserialize(bia, "connect"); diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServerBean.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServerBean.java index deae98d9b40..b8cf70621cb 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServerBean.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServerBean.java @@ -167,7 +167,7 @@ public String getSecureClientPort() { public String getSecureClientAddress() { if (zks.secureServerCnxnFactory != null) { return String.format("%s:%d", zks.secureServerCnxnFactory - .getLocalAddress().getHostString(), + .getLocalAddress().getHostString(), zks.secureServerCnxnFactory.getLocalPort()); } return ""; @@ -207,4 +207,75 @@ public boolean getResponseCachingEnabled() { public void setResponseCachingEnabled(boolean isEnabled) { zks.setResponseCachingEnabled(isEnabled); } + + // Connection throttling settings + /////////////////////////////////////////////////////////////////////////// + + public int getConnectionMaxTokens() { + return zks.connThrottle().getMaxTokens(); + } + + public void setConnectionMaxTokens(int val) { + zks.connThrottle().setMaxTokens(val); + } + + /////////////////////////////////////////////////////////////////////////// + + public int getConnectionTokenFillTime() { + return zks.connThrottle().getFillTime(); + } + + public void setConnectionTokenFillTime(int val) { + zks.connThrottle().setFillTime(val); + } + + /////////////////////////////////////////////////////////////////////////// + + public int getConnectionTokenFillCount() { + return zks.connThrottle().getFillCount(); + } + + public void setConnectionTokenFillCount(int val) { + zks.connThrottle().setFillCount(val); + } + + /////////////////////////////////////////////////////////////////////////// + + public int getConnectionFreezeTime() { + return zks.connThrottle().getFreezeTime(); + } + + public void setConnectionFreezeTime(int val) { + zks.connThrottle().setFreezeTime(val); + } + + /////////////////////////////////////////////////////////////////////////// + + public double getConnectionDropIncrease() { + return zks.connThrottle().getDropIncrease(); + } + + public void setConnectionDropIncrease(double val) { + zks.connThrottle().setDropIncrease(val); + } + + /////////////////////////////////////////////////////////////////////////// + + public double getConnectionDropDecrease() { + return zks.connThrottle().getDropDecrease(); + } + + public void setConnectionDropDecrease(double val) { + zks.connThrottle().setDropDecrease(val); + } + + /////////////////////////////////////////////////////////////////////////// + + public double getConnectionDecreaseRatio() { + return zks.connThrottle().getDecreasePoint(); + } + + public void setConnectionDecreaseRatio(double val) { + zks.connThrottle().setDecreasePoint(val); + } } diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServerMXBean.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServerMXBean.java index bd4d3498d2e..91c8c82bd42 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServerMXBean.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServerMXBean.java @@ -98,6 +98,28 @@ public interface ZooKeeperServerMXBean { public boolean getResponseCachingEnabled(); public void setResponseCachingEnabled(boolean isEnabled); + /* Connection throttling settings */ + public int getConnectionMaxTokens(); + public void setConnectionMaxTokens(int val); + + public int getConnectionTokenFillTime(); + public void setConnectionTokenFillTime(int val); + + public int getConnectionTokenFillCount(); + public void setConnectionTokenFillCount(int val); + + public int getConnectionFreezeTime(); + public void setConnectionFreezeTime(int val); + + public double getConnectionDropIncrease(); + public void setConnectionDropIncrease(double val); + + public double getConnectionDropDecrease(); + public void setConnectionDropDecrease(double val); + + public double getConnectionDecreaseRatio(); + public void setConnectionDecreaseRatio(double val); + /** * Reset packet and latency statistics */ diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/admin/Commands.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/admin/Commands.java index f1e5500563c..b0fa4ff010d 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/admin/Commands.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/admin/Commands.java @@ -354,6 +354,7 @@ public CommandResponse run(ZooKeeperServer zkServer, Map kwargs) OSMXBean osMbean = new OSMXBean(); response.put("open_file_descriptor_count", osMbean.getOpenFileDescriptorCount()); response.put("max_file_descriptor_count", osMbean.getMaxFileDescriptorCount()); + response.put("connection_drop_probability", zkServer.getConnectionDropChance()); response.put("last_client_response_size", stats.getClientResponseStats().getLastBufferSize()); response.put("max_client_response_size", stats.getClientResponseStats().getMaxBufferSize()); diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/server/BlueThrottleTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/server/BlueThrottleTest.java new file mode 100644 index 00000000000..aa27a15f7f4 --- /dev/null +++ b/zookeeper-server/src/test/java/org/apache/zookeeper/server/BlueThrottleTest.java @@ -0,0 +1,157 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.zookeeper.server; + +import org.apache.zookeeper.ZKTestCase; +import org.junit.Assert; +import org.junit.Rule; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Random; + +public class BlueThrottleTest extends ZKTestCase { + private static final Logger LOG = LoggerFactory.getLogger(BlueThrottleTest.class); + + class MockRandom extends Random { + int flag = 0; + BlueThrottle throttle; + + @Override + public double nextDouble() { + if (throttle.getDropChance() > 0) { + flag = 1 - flag; + return flag; + } else { + return 1; + } + } + } + + class BlueThrottleWithMockRandom extends BlueThrottle { + public BlueThrottleWithMockRandom(MockRandom random) { + super(); + this.rng = random; + random.throttle = this; + } + } + + @Test + public void testThrottleDisabled() { + BlueThrottle throttler = new BlueThrottle(); + Assert.assertTrue("Throttle should be disabled by default", throttler.checkLimit(1)); + } + + @Test + public void testThrottleWithoutRefill() { + BlueThrottle throttler = new BlueThrottle(); + throttler.setMaxTokens(1); + throttler.setFillTime(2000); + Assert.assertTrue("First request should be allowed", throttler.checkLimit(1)); + Assert.assertFalse("Second request should be denied", throttler.checkLimit(1)); + } + + @Test + public void testThrottleWithRefill() throws InterruptedException { + BlueThrottle throttler = new BlueThrottle(); + throttler.setMaxTokens(1); + throttler.setFillTime(500); + Assert.assertTrue("First request should be allowed", throttler.checkLimit(1)); + Assert.assertFalse("Second request should be denied", throttler.checkLimit(1)); + + //wait for the bucket to be refilled + Thread.sleep(750); + Assert.assertTrue("Third request should be allowed since we've got a new token", throttler.checkLimit(1)); + } + + @Test + public void testThrottleWithoutRandomDropping() throws InterruptedException { + int maxTokens = 5; + BlueThrottle throttler = new BlueThrottleWithMockRandom(new MockRandom()); + throttler.setMaxTokens(maxTokens); + throttler.setFillCount(maxTokens); + throttler.setFillTime(1000); + + for (int i=0;i0); + + //allow bucket to be refilled + Thread.sleep(1500); + + for (int i=0;i0); + LOG.info("Dropping probability is {}", throttler.getDropChance()); + + //allow bucket to be refilled + Thread.sleep(1100); + LOG.info("Bucket is refilled with {} tokens.", maxTokens); + + int accepted = 0; + for (int i=0;i 0); + } +} diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/server/admin/CommandsTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/server/admin/CommandsTest.java index 9b30c555dc0..000b3cea413 100644 --- a/zookeeper-server/src/test/java/org/apache/zookeeper/server/admin/CommandsTest.java +++ b/zookeeper-server/src/test/java/org/apache/zookeeper/server/admin/CommandsTest.java @@ -190,7 +190,8 @@ public void testMonitor() throws IOException, InterruptedException { new Field("min_client_response_size", Integer.class), new Field("uptime", Long.class), new Field("global_sessions", Long.class), - new Field("local_sessions", Long.class) + new Field("local_sessions", Long.class), + new Field("connection_drop_probability", Double.class) )); for (String metric : ServerMetrics.getAllValues().keySet()) { if (metric.startsWith("avg_")) {