diff --git a/build.xml b/build.xml index 265a2845b3b..41630fb4085 100644 --- a/build.xml +++ b/build.xml @@ -28,7 +28,7 @@ xmlns:cs="antlib:com.puppycrawl.tools.checkstyle.ant"> - + diff --git a/pom.xml b/pom.xml index 22faf7ae8bc..977d693f389 100755 --- a/pom.xml +++ b/pom.xml @@ -278,7 +278,8 @@ 1.1.0 1.56 3.2.2 - 3.1.8 + 2.6 + 3.1.9 @@ -293,6 +294,11 @@ commons-collections ${commons-collections.version} + + commons-lang + commons-lang + ${commons-lang.version} + org.apache.yetus audience-annotations @@ -472,7 +478,7 @@ com.github.spotbugs spotbugs-maven-plugin - 3.1.8 + 3.1.9 excludeFindBugsFilter.xml diff --git a/zookeeper-contrib/pom.xml b/zookeeper-contrib/pom.xml index b796ecf40c9..9e2573c0c91 100755 --- a/zookeeper-contrib/pom.xml +++ b/zookeeper-contrib/pom.xml @@ -34,11 +34,25 @@ Contrib projects to Apache ZooKeeper - + + + + + com.github.spotbugs + spotbugs-maven-plugin + 3.1.9 + + + true + + + + + zookeeper-contrib-loggraph zookeeper-contrib-rest zookeeper-contrib-zooinspector - \ No newline at end of file + diff --git a/zookeeper-docs/src/main/resources/markdown/zookeeperAdmin.md b/zookeeper-docs/src/main/resources/markdown/zookeeperAdmin.md index d808b612a99..57df547b20f 100644 --- a/zookeeper-docs/src/main/resources/markdown/zookeeperAdmin.md +++ b/zookeeper-docs/src/main/resources/markdown/zookeeperAdmin.md @@ -720,6 +720,92 @@ property, when available, is noted below. of the observers on restart. Set to "false" to disable this feature. Default is "true" +* *fastleader.minNotificationInterval* : + (Java system property: **zookeeper.fastleader.minNotificationInterval**) + Lower bound for length of time between two consecutive notification + checks on the leader election. This interval determines how long a + peer waits to check the set of election votes and effects how + quickly an election can resolve. The interval follows a backoff + strategy from the configured minimum (this) and the configured maximum + (fastleader.maxNotificationInterval) for long elections. + +* *fastleader.maxNotificationInterval* : + (Java system property: **zookeeper.fastleader.maxNotificationInterval**) + Upper bound for length of time between two consecutive notification + checks on the leader election. This interval determines how long a + peer waits to check the set of election votes and effects how + quickly an election can resolve. The interval follows a backoff + strategy from the configured minimum (fastleader.minNotificationInterval) + and the configured maximum (this) for long elections. + +* *connectionMaxTokens* : + (Java system property: **zookeeper.connection_throttle_tokens**) + **New in 3.6.0:** + This is one of the parameters to tune the server-side connection throttler, + which is a token-based rate limiting mechanism with optional probabilistic + dropping. + This parameter defines the maximum number of tokens in the token-bucket. + When set to 0, throttling is disabled. Default is 0. + +* *connectionTokenFillTime* : + (Java system property: **zookeeper.connection_throttle_fill_time**) + **New in 3.6.0:** + This is one of the parameters to tune the server-side connection throttler, + which is a token-based rate limiting mechanism with optional probabilistic + dropping. + This parameter defines the interval in milliseconds when the token bucket is re-filled with + *connectionTokenFillCount* tokens. Default is 1. + +* *connectionTokenFillCount* : + (Java system property: **zookeeper.connection_throttle_fill_count**) + **New in 3.6.0:** + This is one of the parameters to tune the server-side connection throttler, + which is a token-based rate limiting mechanism with optional probabilistic + dropping. + This parameter defines the number of tokens to add to the token bucket every + *connectionTokenFillTime* milliseconds. Default is 1. + +* *connectionFreezeTime* : + (Java system property: **zookeeper.connection_throttle_freeze_time**) + **New in 3.6.0:** + This is one of the parameters to tune the server-side connection throttler, + which is a token-based rate limiting mechanism with optional probabilistic + dropping. + This parameter defines the interval in milliseconds when the dropping + probability is adjusted. When set to -1, probabilistic dropping is disabled. + Default is -1. + +* *connectionDropIncrease* : + (Java system property: **zookeeper.connection_throttle_drop_increase**) + **New in 3.6.0:** + This is one of the parameters to tune the server-side connection throttler, + which is a token-based rate limiting mechanism with optional probabilistic + dropping. + This parameter defines the dropping probability to increase. The throttler + checks every *connectionFreezeTime* milliseconds and if the token bucket is + empty, the dropping probability will be increased by *connectionDropIncrease*. + The default is 0.02. + +* *connectionDropDecrease* : + (Java system property: **zookeeper.connection_throttle_drop_decrease**) + **New in 3.6.0:** + This is one of the parameters to tune the server-side connection throttler, + which is a token-based rate limiting mechanism with optional probabilistic + dropping. + This parameter defines the dropping probability to decrease. The throttler + checks every *connectionFreezeTime* milliseconds and if the token bucket has + more tokens than a threshold, the dropping probability will be decreased by + *connectionDropDecrease*. The threshold is *connectionMaxTokens* \* + *connectionDecreaseRatio*. The default is 0.002. + +* *connectionDecreaseRatio* : + (Java system property: **zookeeper.connection_throttle_decrease_ratio**) + **New in 3.6.0:** + This is one of the parameters to tune the server-side connection throttler, + which is a token-based rate limiting mechanism with optional probabilistic + dropping. This parameter defines the threshold to decrease the dropping + probability. The default is 0. + #### Cluster Options diff --git a/zookeeper-recipes/pom.xml b/zookeeper-recipes/pom.xml index 70513338191..0aecd580c97 100755 --- a/zookeeper-recipes/pom.xml +++ b/zookeeper-recipes/pom.xml @@ -62,4 +62,4 @@ zookeeper-recipes-queue - \ No newline at end of file + diff --git a/zookeeper-recipes/zookeeper-recipes-election/src/main/java/org/apache/zookeeper/recipes/leader/LeaderElectionSupport.java b/zookeeper-recipes/zookeeper-recipes-election/src/main/java/org/apache/zookeeper/recipes/leader/LeaderElectionSupport.java index 8d4096d8bac..6013ffd908e 100644 --- a/zookeeper-recipes/zookeeper-recipes-election/src/main/java/org/apache/zookeeper/recipes/leader/LeaderElectionSupport.java +++ b/zookeeper-recipes/zookeeper-recipes-election/src/main/java/org/apache/zookeeper/recipes/leader/LeaderElectionSupport.java @@ -180,27 +180,36 @@ private void makeOffer() throws KeeperException, InterruptedException { state = State.OFFER; dispatchEvent(EventType.OFFER_START); - leaderOffer = new LeaderOffer(); - - leaderOffer.setHostName(hostName); - leaderOffer.setNodePath(zooKeeper.create(rootNodeName + "/" + "n_", - hostName.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, + LeaderOffer newLeaderOffer = new LeaderOffer(); + byte[] hostnameBytes; + synchronized (this) { + newLeaderOffer.setHostName(hostName); + hostnameBytes = hostName.getBytes(); + newLeaderOffer.setNodePath(zooKeeper.create(rootNodeName + "/" + "n_", + hostnameBytes, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL)); - + leaderOffer = newLeaderOffer; + } logger.debug("Created leader offer {}", leaderOffer); dispatchEvent(EventType.OFFER_COMPLETE); } + private synchronized LeaderOffer getLeaderOffer() { + return leaderOffer; + } + private void determineElectionStatus() throws KeeperException, InterruptedException { state = State.DETERMINE; dispatchEvent(EventType.DETERMINE_START); - String[] components = leaderOffer.getNodePath().split("/"); + LeaderOffer currentLeaderOffer = getLeaderOffer(); + + String[] components = currentLeaderOffer.getNodePath().split("/"); - leaderOffer.setId(Integer.valueOf(components[components.length - 1] + currentLeaderOffer.setId(Integer.valueOf(components[components.length - 1] .substring("n_".length()))); List leaderOffers = toLeaderOffers(zooKeeper.getChildren( @@ -215,7 +224,7 @@ private void determineElectionStatus() throws KeeperException, for (int i = 0; i < leaderOffers.size(); i++) { LeaderOffer leaderOffer = leaderOffers.get(i); - if (leaderOffer.getId().equals(this.leaderOffer.getId())) { + if (leaderOffer.getId().equals(currentLeaderOffer.getId())) { logger.debug("There are {} leader offers. I am {} in line.", leaderOffers.size(), i); @@ -237,7 +246,7 @@ private void becomeReady(LeaderOffer neighborLeaderOffer) throws KeeperException, InterruptedException { logger.info("{} not elected leader. Watching node:{}", - leaderOffer.getNodePath(), neighborLeaderOffer.getNodePath()); + getLeaderOffer().getNodePath(), neighborLeaderOffer.getNodePath()); /* * Make sure to pass an explicit Watcher because we could be sharing this @@ -270,7 +279,7 @@ private void becomeLeader() { state = State.ELECTED; dispatchEvent(EventType.ELECTED_START); - logger.info("Becoming leader with node:{}", leaderOffer.getNodePath()); + logger.info("Becoming leader with node:{}", getLeaderOffer().getNodePath()); dispatchEvent(EventType.ELECTED_COMPLETE); } @@ -336,7 +345,7 @@ private List toLeaderOffers(List strings) @Override public void process(WatchedEvent event) { if (event.getType().equals(Watcher.Event.EventType.NodeDeleted)) { - if (!event.getPath().equals(leaderOffer.getNodePath()) + if (!event.getPath().equals(getLeaderOffer().getNodePath()) && state != State.STOP) { logger.debug( "Node {} deleted. Need to run through the election process.", @@ -384,8 +393,8 @@ public void removeListener(LeaderElectionAware listener) { @Override public String toString() { - return "{ state:" + state + " leaderOffer:" + leaderOffer + " zooKeeper:" - + zooKeeper + " hostName:" + hostName + " listeners:" + listeners + return "{ state:" + state + " leaderOffer:" + getLeaderOffer() + " zooKeeper:" + + zooKeeper + " hostName:" + getHostName() + " listeners:" + listeners + " }"; } @@ -437,11 +446,11 @@ public void setZooKeeper(ZooKeeper zooKeeper) { * The hostname of this process. Mostly used as a convenience for logging and * to respond to {@link #getLeaderHostName()} requests. */ - public String getHostName() { + public synchronized String getHostName() { return hostName; } - public void setHostName(String hostName) { + public synchronized void setHostName(String hostName) { this.hostName = hostName; } diff --git a/zookeeper-recipes/zookeeper-recipes-election/src/main/java/org/apache/zookeeper/recipes/leader/LeaderOffer.java b/zookeeper-recipes/zookeeper-recipes-election/src/main/java/org/apache/zookeeper/recipes/leader/LeaderOffer.java index 188a6d5753d..bef634d486c 100644 --- a/zookeeper-recipes/zookeeper-recipes-election/src/main/java/org/apache/zookeeper/recipes/leader/LeaderOffer.java +++ b/zookeeper-recipes/zookeeper-recipes-election/src/main/java/org/apache/zookeeper/recipes/leader/LeaderOffer.java @@ -16,6 +16,7 @@ */ package org.apache.zookeeper.recipes.leader; +import java.io.Serializable; import java.util.Comparator; /** @@ -72,7 +73,8 @@ public void setHostName(String hostName) { * Compare two instances of {@link LeaderOffer} using only the {code}id{code} * member. */ - public static class IdComparator implements Comparator { + public static class IdComparator + implements Comparator, Serializable { @Override public int compare(LeaderOffer o1, LeaderOffer o2) { diff --git a/zookeeper-recipes/zookeeper-recipes-lock/pom.xml b/zookeeper-recipes/zookeeper-recipes-lock/pom.xml index 64487285585..9e8d1471c6a 100755 --- a/zookeeper-recipes/zookeeper-recipes-lock/pom.xml +++ b/zookeeper-recipes/zookeeper-recipes-lock/pom.xml @@ -40,6 +40,13 @@ zookeeper-server 3.6.0-SNAPSHOT + + com.github.spotbugs + spotbugs-annotations + 3.1.9 + provided + true + org.apache.zookeeper zookeeper-server @@ -71,4 +78,4 @@ - \ No newline at end of file + diff --git a/zookeeper-recipes/zookeeper-recipes-lock/src/main/java/org/apache/zookeeper/recipes/lock/WriteLock.java b/zookeeper-recipes/zookeeper-recipes-lock/src/main/java/org/apache/zookeeper/recipes/lock/WriteLock.java index 5caebee4667..53f64cbdda5 100644 --- a/zookeeper-recipes/zookeeper-recipes-lock/src/main/java/org/apache/zookeeper/recipes/lock/WriteLock.java +++ b/zookeeper-recipes/zookeeper-recipes-lock/src/main/java/org/apache/zookeeper/recipes/lock/WriteLock.java @@ -17,6 +17,7 @@ */ package org.apache.zookeeper.recipes.lock; +import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.zookeeper.KeeperException; @@ -85,7 +86,7 @@ public WriteLock(ZooKeeper zookeeper, String dir, List acl, * return the current locklistener * @return the locklistener */ - public LockListener getLockListener() { + public synchronized LockListener getLockListener() { return this.callback; } @@ -93,7 +94,7 @@ public LockListener getLockListener() { * register a different call back listener * @param callback the call back instance */ - public void setLockListener(LockListener callback) { + public synchronized void setLockListener(LockListener callback) { this.callback = callback; } @@ -133,8 +134,9 @@ public boolean execute() throws KeeperException, initCause(e); } finally { - if (callback != null) { - callback.lockReleased(); + LockListener lockListener = getLockListener(); + if (lockListener != null) { + lockListener.lockReleased(); } id = null; } @@ -201,6 +203,8 @@ private void findPrefixInChildren(String prefix, ZooKeeper zookeeper, String dir * obtaining the lock * @return if the command was successful or not */ + @SuppressFBWarnings(value = "NP_NULL_PARAM_DEREF_NONVIRTUAL", + justification = "findPrefixInChildren will assign a value to this.id") public boolean execute() throws KeeperException, InterruptedException { do { if (id == null) { @@ -211,41 +215,40 @@ public boolean execute() throws KeeperException, InterruptedException { findPrefixInChildren(prefix, zookeeper, dir); idName = new ZNodeName(id); } - if (id != null) { - List names = zookeeper.getChildren(dir, false); - if (names.isEmpty()) { - LOG.warn("No children in: " + dir + " when we've just " + - "created one! Lets recreate it..."); - // lets force the recreation of the id - id = null; - } else { - // lets sort them explicitly (though they do seem to come back in order ususally :) - SortedSet sortedNames = new TreeSet(); - for (String name : names) { - sortedNames.add(new ZNodeName(dir + "/" + name)); + List names = zookeeper.getChildren(dir, false); + if (names.isEmpty()) { + LOG.warn("No children in: " + dir + " when we've just " + + "created one! Lets recreate it..."); + // lets force the recreation of the id + id = null; + } else { + // lets sort them explicitly (though they do seem to come back in order ususally :) + SortedSet sortedNames = new TreeSet(); + for (String name : names) { + sortedNames.add(new ZNodeName(dir + "/" + name)); + } + ownerId = sortedNames.first().getName(); + SortedSet lessThanMe = sortedNames.headSet(idName); + if (!lessThanMe.isEmpty()) { + ZNodeName lastChildName = lessThanMe.last(); + lastChildId = lastChildName.getName(); + if (LOG.isDebugEnabled()) { + LOG.debug("watching less than me node: " + lastChildId); } - ownerId = sortedNames.first().getName(); - SortedSet lessThanMe = sortedNames.headSet(idName); - if (!lessThanMe.isEmpty()) { - ZNodeName lastChildName = lessThanMe.last(); - lastChildId = lastChildName.getName(); - if (LOG.isDebugEnabled()) { - LOG.debug("watching less than me node: " + lastChildId); - } - Stat stat = zookeeper.exists(lastChildId, new LockWatcher()); - if (stat != null) { - return Boolean.FALSE; - } else { - LOG.warn("Could not find the" + - " stats for less than me: " + lastChildName.getName()); - } + Stat stat = zookeeper.exists(lastChildId, new LockWatcher()); + if (stat != null) { + return Boolean.FALSE; } else { - if (isOwner()) { - if (callback != null) { - callback.lockAcquired(); - } - return Boolean.TRUE; + LOG.warn("Could not find the" + + " stats for less than me: " + lastChildName.getName()); + } + } else { + if (isOwner()) { + LockListener lockListener = getLockListener(); + if (lockListener != null) { + lockListener.lockAcquired(); } + return Boolean.TRUE; } } } diff --git a/zookeeper-recipes/zookeeper-recipes-queue/src/main/java/org/apache/zookeeper/recipes/queue/DistributedQueue.java b/zookeeper-recipes/zookeeper-recipes-queue/src/main/java/org/apache/zookeeper/recipes/queue/DistributedQueue.java index c35c332e251..ca01541f545 100644 --- a/zookeeper-recipes/zookeeper-recipes-queue/src/main/java/org/apache/zookeeper/recipes/queue/DistributedQueue.java +++ b/zookeeper-recipes/zookeeper-recipes-queue/src/main/java/org/apache/zookeeper/recipes/queue/DistributedQueue.java @@ -87,7 +87,7 @@ private Map orderedChildren(Watcher watcher) throws KeeperException continue; } String suffix = childName.substring(prefix.length()); - Long childId = new Long(suffix); + Long childId = Long.parseLong(suffix); orderedChildren.put(childId,childName); }catch(NumberFormatException e){ LOG.warn("Found child node with improper format : " + childName + " " + e,e); @@ -209,7 +209,7 @@ public byte[] remove() throws NoSuchElementException, KeeperException, Interrupt } } - private class LatchChildWatcher implements Watcher { + private static class LatchChildWatcher implements Watcher { CountDownLatch latch; diff --git a/zookeeper-server/pom.xml b/zookeeper-server/pom.xml index 90624693524..90dff72fbca 100755 --- a/zookeeper-server/pom.xml +++ b/zookeeper-server/pom.xml @@ -50,6 +50,10 @@ commons-collections test + + commons-lang + commons-lang + org.apache.zookeeper zookeeper-jute @@ -267,4 +271,4 @@ - \ No newline at end of file + diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/common/SSLContextAndOptions.java b/zookeeper-server/src/main/java/org/apache/zookeeper/common/SSLContextAndOptions.java new file mode 100644 index 00000000000..2d60ab83ab1 --- /dev/null +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/common/SSLContextAndOptions.java @@ -0,0 +1,192 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.zookeeper.common; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.net.Socket; +import java.util.Arrays; + +import javax.net.ssl.SSLContext; +import javax.net.ssl.SSLParameters; +import javax.net.ssl.SSLServerSocket; +import javax.net.ssl.SSLSocket; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static java.util.Objects.requireNonNull; + +/** + * Wrapper class for an SSLContext + some config options that can't be set on the context when it is created but + * must be set on a secure socket created by the context after the socket creation. By wrapping the options in this + * class we avoid reading from global system properties during socket configuration. This makes testing easier + * since we can create different X509Util instances with different configurations in a single test process, and + * unit test interactions between them. + */ +public class SSLContextAndOptions { + private static final Logger LOG = LoggerFactory.getLogger(SSLContextAndOptions.class); + + private final X509Util x509Util; + private final String[] enabledProtocols; + private final String[] cipherSuites; + private final X509Util.ClientAuth clientAuth; + private final SSLContext sslContext; + private final int handshakeDetectionTimeoutMillis; + + /** + * Note: constructor is intentionally package-private, only the X509Util class should be creating instances of this + * class. + * @param x509Util the X509Util that created this object. + * @param config a ZKConfig that holds config properties. + * @param sslContext the SSLContext. + */ + SSLContextAndOptions(final X509Util x509Util, final ZKConfig config, final SSLContext sslContext) { + this.x509Util = requireNonNull(x509Util); + this.sslContext = requireNonNull(sslContext); + this.enabledProtocols = getEnabledProtocols(requireNonNull(config), sslContext); + this.cipherSuites = getCipherSuites(config); + this.clientAuth = getClientAuth(config); + this.handshakeDetectionTimeoutMillis = getHandshakeDetectionTimeoutMillis(config); + } + + public SSLContext getSSLContext() { + return sslContext; + } + + public SSLSocket createSSLSocket() throws IOException { + return configureSSLSocket((SSLSocket) sslContext.getSocketFactory().createSocket(), true); + } + + public SSLSocket createSSLSocket(Socket socket, byte[] pushbackBytes) throws IOException { + SSLSocket sslSocket; + if (pushbackBytes != null && pushbackBytes.length > 0) { + sslSocket = (SSLSocket) sslContext.getSocketFactory().createSocket( + socket, new ByteArrayInputStream(pushbackBytes), true); + } else { + sslSocket = (SSLSocket) sslContext.getSocketFactory().createSocket( + socket, null, socket.getPort(), true); + } + return configureSSLSocket(sslSocket, false); + } + + public SSLServerSocket createSSLServerSocket() throws IOException { + SSLServerSocket sslServerSocket = + (SSLServerSocket) sslContext.getServerSocketFactory().createServerSocket(); + return configureSSLServerSocket(sslServerSocket); + } + + public SSLServerSocket createSSLServerSocket(int port) throws IOException { + SSLServerSocket sslServerSocket = + (SSLServerSocket) sslContext.getServerSocketFactory().createServerSocket(port); + return configureSSLServerSocket(sslServerSocket); + } + + public int getHandshakeDetectionTimeoutMillis() { + return handshakeDetectionTimeoutMillis; + } + + private SSLSocket configureSSLSocket(SSLSocket socket, boolean isClientSocket) { + SSLParameters sslParameters = socket.getSSLParameters(); + configureSslParameters(sslParameters, isClientSocket); + socket.setSSLParameters(sslParameters); + socket.setUseClientMode(isClientSocket); + return socket; + } + + private SSLServerSocket configureSSLServerSocket(SSLServerSocket socket) { + SSLParameters sslParameters = socket.getSSLParameters(); + configureSslParameters(sslParameters, false); + socket.setSSLParameters(sslParameters); + socket.setUseClientMode(false); + return socket; + } + + private void configureSslParameters(SSLParameters sslParameters, boolean isClientSocket) { + if (cipherSuites != null) { + if (LOG.isDebugEnabled()) { + LOG.debug("Setup cipher suites for {} socket: {}", + isClientSocket ? "client" : "server", + Arrays.toString(cipherSuites)); + } + sslParameters.setCipherSuites(cipherSuites); + } + if (enabledProtocols != null) { + if (LOG.isDebugEnabled()) { + LOG.debug("Setup enabled protocols for {} socket: {}", + isClientSocket ? "client" : "server", + Arrays.toString(enabledProtocols)); + } + sslParameters.setProtocols(enabledProtocols); + } + if (!isClientSocket) { + switch (clientAuth) { + case NEED: + sslParameters.setNeedClientAuth(true); + break; + case WANT: + sslParameters.setWantClientAuth(true); + break; + default: + sslParameters.setNeedClientAuth(false); // also clears the wantClientAuth flag according to docs + break; + } + } + } + + private String[] getEnabledProtocols(final ZKConfig config, final SSLContext sslContext) { + String enabledProtocolsInput = config.getProperty(x509Util.getSslEnabledProtocolsProperty()); + if (enabledProtocolsInput == null) { + return new String[] { sslContext.getProtocol() }; + } + return enabledProtocolsInput.split(","); + } + + private String[] getCipherSuites(final ZKConfig config) { + String cipherSuitesInput = config.getProperty(x509Util.getSslCipherSuitesProperty()); + if (cipherSuitesInput == null) { + return X509Util.getDefaultCipherSuites(); + } else { + return cipherSuitesInput.split(","); + } + } + + private X509Util.ClientAuth getClientAuth(final ZKConfig config) { + return X509Util.ClientAuth.fromPropertyValue(config.getProperty(x509Util.getSslClientAuthProperty())); + } + + private int getHandshakeDetectionTimeoutMillis(final ZKConfig config) { + String propertyString = config.getProperty(x509Util.getSslHandshakeDetectionTimeoutMillisProperty()); + int result; + if (propertyString == null) { + result = X509Util.DEFAULT_HANDSHAKE_DETECTION_TIMEOUT_MILLIS; + } else { + result = Integer.parseInt(propertyString); + if (result < 1) { + // Timeout of 0 is not allowed, since an infinite timeout can permanently lock up an + // accept() thread. + LOG.warn("Invalid value for {}: {}, using the default value of {}", + x509Util.getSslHandshakeDetectionTimeoutMillisProperty(), + result, + X509Util.DEFAULT_HANDSHAKE_DETECTION_TIMEOUT_MILLIS); + result = X509Util.DEFAULT_HANDSHAKE_DETECTION_TIMEOUT_MILLIS; + } + } + return result; + } +} diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/common/X509Util.java b/zookeeper-server/src/main/java/org/apache/zookeeper/common/X509Util.java index 310f3e66fa3..3ca787fc34d 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/common/X509Util.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/common/X509Util.java @@ -34,6 +34,7 @@ import java.security.cert.PKIXBuilderParameters; import java.security.cert.X509CertSelector; import java.util.Arrays; +import java.util.Objects; import java.util.concurrent.atomic.AtomicReference; import javax.net.ssl.CertPathTrustManagerParameters; @@ -97,7 +98,38 @@ public abstract class X509Util implements Closeable, AutoCloseable { public static final int DEFAULT_HANDSHAKE_DETECTION_TIMEOUT_MILLIS = 5000; + /** + * Enum specifying the client auth requirement of server-side TLS sockets created by this X509Util. + *
    + *
  • NONE - do not request a client certificate.
  • + *
  • WANT - request a client certificate, but allow anonymous clients to connect.
  • + *
  • NEED - require a client certificate, disconnect anonymous clients.
  • + *
+ * + * If the config property is not set, the default value is NEED. + */ + public enum ClientAuth { + NONE, + WANT, + NEED; + + /** + * Converts a property value to a ClientAuth enum. If the input string is empty or null, returns + * ClientAuth.NEED. + * @param prop the property string. + * @return the ClientAuth. + * @throws IllegalArgumentException if the property value is not "NONE", "WANT", "NEED", or empty/null. + */ + public static ClientAuth fromPropertyValue(String prop) { + if (prop == null || prop.length() == 0) { + return NEED; + } + return ClientAuth.valueOf(prop.toUpperCase()); + } + } + private String sslProtocolProperty = getConfigPrefix() + "protocol"; + private String sslEnabledProtocolsProperty = getConfigPrefix() + "enabledProtocols"; private String cipherSuitesProperty = getConfigPrefix() + "ciphersuites"; private String sslKeystoreLocationProperty = getConfigPrefix() + "keyStore.location"; private String sslKeystorePasswdProperty = getConfigPrefix() + "keyStore.password"; @@ -108,30 +140,36 @@ public abstract class X509Util implements Closeable, AutoCloseable { private String sslHostnameVerificationEnabledProperty = getConfigPrefix() + "hostnameVerification"; private String sslCrlEnabledProperty = getConfigPrefix() + "crl"; private String sslOcspEnabledProperty = getConfigPrefix() + "ocsp"; + private String sslClientAuthProperty = getConfigPrefix() + "clientAuth"; private String sslHandshakeDetectionTimeoutMillisProperty = getConfigPrefix() + "handshakeDetectionTimeoutMillis"; - private String[] cipherSuites; + private ZKConfig zkConfig; + private AtomicReference defaultSSLContextAndOptions = new AtomicReference<>(null); - private AtomicReference defaultSSLContext = new AtomicReference<>(null); private FileChangeWatcher keyStoreFileWatcher; private FileChangeWatcher trustStoreFileWatcher; public X509Util() { - String cipherSuitesInput = System.getProperty(cipherSuitesProperty); - if (cipherSuitesInput == null) { - cipherSuites = getDefaultCipherSuites(); - } else { - cipherSuites = cipherSuitesInput.split(","); - } + this(null); + } + + public X509Util(ZKConfig zkConfig) { + this.zkConfig = zkConfig; + keyStoreFileWatcher = trustStoreFileWatcher = null; } protected abstract String getConfigPrefix(); + protected abstract boolean shouldVerifyClientHostname(); public String getSslProtocolProperty() { return sslProtocolProperty; } + public String getSslEnabledProtocolsProperty() { + return sslEnabledProtocolsProperty; + } + public String getCipherSuitesProperty() { return cipherSuitesProperty; } @@ -140,6 +178,10 @@ public String getSslKeystoreLocationProperty() { return sslKeystoreLocationProperty; } + public String getSslCipherSuitesProperty() { + return cipherSuitesProperty; + } + public String getSslKeystorePasswdProperty() { return sslKeystorePasswdProperty; } @@ -172,6 +214,10 @@ public String getSslOcspEnabledProperty() { return sslOcspEnabledProperty; } + public String getSslClientAuthProperty() { + return sslClientAuthProperty; + } + /** * Returns the config property key that controls the amount of time, in milliseconds, that the first * UnifiedServerSocket read operation will block for when trying to detect the client mode (TLS or PLAINTEXT). @@ -183,30 +229,37 @@ public String getSslHandshakeDetectionTimeoutMillisProperty() { } public SSLContext getDefaultSSLContext() throws X509Exception.SSLContextException { - SSLContext result = defaultSSLContext.get(); + return getDefaultSSLContextAndOptions().getSSLContext(); + } + + public SSLContext createSSLContext(ZKConfig config) throws SSLContextException { + return createSSLContextAndOptions(config).getSSLContext(); + } + + public SSLContextAndOptions getDefaultSSLContextAndOptions() throws X509Exception.SSLContextException { + SSLContextAndOptions result = defaultSSLContextAndOptions.get(); if (result == null) { - result = createSSLContext(); - if (!defaultSSLContext.compareAndSet(null, result)) { + result = createSSLContextAndOptions(); + if (!defaultSSLContextAndOptions.compareAndSet(null, result)) { // lost the race, another thread already set the value - result = defaultSSLContext.get(); + result = defaultSSLContextAndOptions.get(); } } return result; } - private void resetDefaultSSLContext() throws X509Exception.SSLContextException { - SSLContext newContext = createSSLContext(); - defaultSSLContext.set(newContext); + private void resetDefaultSSLContextAndOptions() throws X509Exception.SSLContextException { + SSLContextAndOptions newContext = createSSLContextAndOptions(); + defaultSSLContextAndOptions.set(newContext); } - private SSLContext createSSLContext() throws SSLContextException { + private SSLContextAndOptions createSSLContextAndOptions() throws SSLContextException { /* * Since Configuration initializes the key store and trust store related * configuration from system property. Reading property from * configuration will be same reading from system property */ - ZKConfig config=new ZKConfig(); - return createSSLContext(config); + return createSSLContextAndOptions(zkConfig == null ? new ZKConfig() : zkConfig); } /** @@ -217,24 +270,19 @@ private SSLContext createSSLContext() throws SSLContextException { * @return the handshake detection timeout, in milliseconds. */ public int getSslHandshakeTimeoutMillis() { - String propertyString = System.getProperty(getSslHandshakeDetectionTimeoutMillisProperty()); - int result; - if (propertyString == null) { - result = DEFAULT_HANDSHAKE_DETECTION_TIMEOUT_MILLIS; - } else { - result = Integer.parseInt(propertyString); - if (result < 1) { - // Timeout of 0 is not allowed, since an infinite timeout can permanently lock up an - // accept() thread. - LOG.warn("Invalid value for " + getSslHandshakeDetectionTimeoutMillisProperty() + ": " + result + - ", using the default value of " + DEFAULT_HANDSHAKE_DETECTION_TIMEOUT_MILLIS); - result = DEFAULT_HANDSHAKE_DETECTION_TIMEOUT_MILLIS; - } + try { + SSLContextAndOptions ctx = getDefaultSSLContextAndOptions(); + return ctx.getHandshakeDetectionTimeoutMillis(); + } catch (SSLContextException e) { + LOG.error("Error creating SSL context and options", e); + return DEFAULT_HANDSHAKE_DETECTION_TIMEOUT_MILLIS; + } catch (Exception e) { + LOG.error("Error parsing config property " + getSslHandshakeDetectionTimeoutMillisProperty(), e); + return DEFAULT_HANDSHAKE_DETECTION_TIMEOUT_MILLIS; } - return result; } - public SSLContext createSSLContext(ZKConfig config) throws SSLContextException { + public SSLContextAndOptions createSSLContextAndOptions(ZKConfig config) throws SSLContextException { KeyManager[] keyManagers = null; TrustManager[] trustManagers = null; @@ -284,12 +332,12 @@ public SSLContext createSSLContext(ZKConfig config) throws SSLContextException { } } - String protocol = System.getProperty(sslProtocolProperty, DEFAULT_PROTOCOL); + String protocol = config.getProperty(sslProtocolProperty, DEFAULT_PROTOCOL); try { SSLContext sslContext = SSLContext.getInstance(protocol); sslContext.init(keyManagers, trustManagers, null); - return sslContext; - } catch (NoSuchAlgorithmException|KeyManagementException sslContextInitException) { + return new SSLContextAndOptions(this, config, sslContext); + } catch (NoSuchAlgorithmException | KeyManagementException sslContextInitException) { throw new SSLContextException(sslContextInitException); } } @@ -414,64 +462,40 @@ public static X509TrustManager createTrustManager( } public SSLSocket createSSLSocket() throws X509Exception, IOException { - SSLSocket sslSocket = (SSLSocket) getDefaultSSLContext().getSocketFactory().createSocket(); - configureSSLSocket(sslSocket); - sslSocket.setUseClientMode(true); - return sslSocket; + return getDefaultSSLContextAndOptions().createSSLSocket(); } public SSLSocket createSSLSocket(Socket socket, byte[] pushbackBytes) throws X509Exception, IOException { - SSLSocket sslSocket; - if (pushbackBytes != null && pushbackBytes.length > 0) { - sslSocket = (SSLSocket) getDefaultSSLContext().getSocketFactory().createSocket( - socket, new ByteArrayInputStream(pushbackBytes), true); - } else { - sslSocket = (SSLSocket) getDefaultSSLContext().getSocketFactory().createSocket( - socket, null, socket.getPort(), true); - } - configureSSLSocket(sslSocket); - sslSocket.setUseClientMode(false); - sslSocket.setNeedClientAuth(true); - return sslSocket; - } - - private void configureSSLSocket(SSLSocket sslSocket) { - SSLParameters sslParameters = sslSocket.getSSLParameters(); - LOG.debug("Setup cipher suites for client socket: {}", Arrays.toString(cipherSuites)); - sslParameters.setCipherSuites(cipherSuites); - sslSocket.setSSLParameters(sslParameters); + return getDefaultSSLContextAndOptions().createSSLSocket(socket, pushbackBytes); } public SSLServerSocket createSSLServerSocket() throws X509Exception, IOException { - SSLServerSocket sslServerSocket = (SSLServerSocket) getDefaultSSLContext().getServerSocketFactory().createServerSocket(); - configureSSLServerSocket(sslServerSocket); - - return sslServerSocket; + return getDefaultSSLContextAndOptions().createSSLServerSocket(); } public SSLServerSocket createSSLServerSocket(int port) throws X509Exception, IOException { - SSLServerSocket sslServerSocket = (SSLServerSocket) getDefaultSSLContext().getServerSocketFactory().createServerSocket(port); - configureSSLServerSocket(sslServerSocket); - - return sslServerSocket; + return getDefaultSSLContextAndOptions().createSSLServerSocket(port); } - private void configureSSLServerSocket(SSLServerSocket sslServerSocket) { - SSLParameters sslParameters = sslServerSocket.getSSLParameters(); - sslParameters.setNeedClientAuth(true); - LOG.debug("Setup cipher suites for server socket: {}", Arrays.toString(cipherSuites)); - sslParameters.setCipherSuites(cipherSuites); - sslServerSocket.setSSLParameters(sslParameters); + static String[] getDefaultCipherSuites() { + return getDefaultCipherSuitesForJavaVersion(System.getProperty("java.specification.version")); } - private String[] getDefaultCipherSuites() { - String javaVersion = System.getProperty("java.specification.version"); - if ("9".equals(javaVersion)) { - LOG.debug("Using Java9-optimized cipher suites for Java version {}", javaVersion); + static String[] getDefaultCipherSuitesForJavaVersion(String javaVersion) { + Objects.requireNonNull(javaVersion); + if (javaVersion.matches("\\d+")) { + // Must be Java 9 or later + LOG.debug("Using Java9+ optimized cipher suites for Java version {}", javaVersion); return DEFAULT_CIPHERS_JAVA9; + } else if (javaVersion.startsWith("1.")) { + // Must be Java 1.8 or earlier + LOG.debug("Using Java8 optimized cipher suites for Java version {}", javaVersion); + return DEFAULT_CIPHERS_JAVA8; + } else { + LOG.debug("Could not parse java version {}, using Java8 optimized cipher suites", + javaVersion); + return DEFAULT_CIPHERS_JAVA8; } - LOG.debug("Using Java8-optimized cipher suites for Java version {}", javaVersion); - return DEFAULT_CIPHERS_JAVA8; } private FileChangeWatcher newFileChangeWatcher(String fileLocation) throws IOException { @@ -498,7 +522,7 @@ private FileChangeWatcher newFileChangeWatcher(String fileLocation) throws IOExc */ public void enableCertFileReloading() throws IOException { LOG.info("enabling cert file reloading"); - ZKConfig config = new ZKConfig(); + ZKConfig config = zkConfig == null ? new ZKConfig() : zkConfig; FileChangeWatcher newKeyStoreFileWatcher = newFileChangeWatcher(config.getProperty(sslKeystoreLocationProperty)); if (newKeyStoreFileWatcher != null) { @@ -563,7 +587,7 @@ private void handleWatchEvent(Path filePath, WatchEvent event) { event.kind() + " with context: " + event.context()); } try { - this.resetDefaultSSLContext(); + this.resetDefaultSSLContextAndOptions(); } catch (SSLContextException e) { throw new RuntimeException(e); } diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/common/ZKConfig.java b/zookeeper-server/src/main/java/org/apache/zookeeper/common/ZKConfig.java index 086c07ee812..43bc2d8e95c 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/common/ZKConfig.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/common/ZKConfig.java @@ -115,6 +115,12 @@ protected void handleBackwardCompatibility() { } private void putSSLProperties(X509Util x509Util) { + properties.put(x509Util.getSslProtocolProperty(), + System.getProperty(x509Util.getSslProtocolProperty())); + properties.put(x509Util.getSslEnabledProtocolsProperty(), + System.getProperty(x509Util.getSslEnabledProtocolsProperty())); + properties.put(x509Util.getSslCipherSuitesProperty(), + System.getProperty(x509Util.getSslCipherSuitesProperty())); properties.put(x509Util.getSslKeystoreLocationProperty(), System.getProperty(x509Util.getSslKeystoreLocationProperty())); properties.put(x509Util.getSslKeystorePasswdProperty(), @@ -133,6 +139,8 @@ private void putSSLProperties(X509Util x509Util) { System.getProperty(x509Util.getSslCrlEnabledProperty())); properties.put(x509Util.getSslOcspEnabledProperty(), System.getProperty(x509Util.getSslOcspEnabledProperty())); + properties.put(x509Util.getSslClientAuthProperty(), + System.getProperty(x509Util.getSslClientAuthProperty())); properties.put(x509Util.getSslHandshakeDetectionTimeoutMillisProperty(), System.getProperty(x509Util.getSslHandshakeDetectionTimeoutMillisProperty())); } diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/BlueThrottle.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/BlueThrottle.java new file mode 100644 index 00000000000..1aa9e5b838d --- /dev/null +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/BlueThrottle.java @@ -0,0 +1,268 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zookeeper.server; + +import java.util.Random; + +import org.apache.zookeeper.common.Time; + +/** + * Implements a token-bucket based rate limiting mechanism with optional + * probabilistic dropping inspired by the BLUE queue management algorithm [1]. + * + * The throttle provides the {@link #checkLimit(int)} method which provides + * a binary yes/no decision. + * + * The core token bucket algorithm starts with an initial set of tokens based + * on the maxTokens setting. Tokens are dispensed each + * {@link #checkLimit(int)} call, which fails if there are not enough tokens to + * satisfy a given request. + * + * The token bucket refills over time, providing fillCount tokens + * every fillTime milliseconds, capping at maxTokens. + * + * This design allows the throttle to allow short bursts to pass, while still + * capping the total number of requests per time interval. + * + * One issue with a pure token bucket approach for something like request or + * connection throttling is that the wall clock arrival time of requests affects + * the probability of a request being allowed to pass or not. Under constant + * load this can lead to request starvation for requests that constantly arrive + * later than the majority. + * + * In an attempt to combat this, this throttle can also provide probabilistic + * dropping. This is enabled anytime freezeTime is set to a value + * other than -1. + * + * The probabilistic algorithm starts with an initial drop probability of 0, and + * adjusts this probability roughly every freezeTime milliseconds. + * The first request after freezeTime, the algorithm checks the + * token bucket. If the token bucket is empty, the drop probability is increased + * by dropIncrease up to a maximum of 1. Otherwise, if + * the bucket has a token deficit less than decreasePoint * maxTokens, + * the probability is decreased by dropDecrease. + * + * Given a call to {@link #checkLimit(int)}, requests are first dropped randomly + * based on the current drop probability, and only surviving requests are then + * checked against the token bucket. + * + * When under constant load, the probabilistic algorithm will adapt to a drop + * frequency that should keep requests within the token limit. When load drops, + * the drop probability will decrease, eventually returning to zero if possible. + * + * [1] "BLUE: A New Class of Active Queue Management Algorithms" + **/ + +public class BlueThrottle { + private int maxTokens; + private int fillTime; + private int fillCount; + private int tokens; + private long lastTime; + + private int freezeTime; + private long lastFreeze; + private double dropIncrease; + private double dropDecrease; + private double decreasePoint; + private double drop; + + Random rng; + + public static final String CONNECTION_THROTTLE_TOKENS = "zookeeper.connection_throttle_tokens"; + public static final int DEFAULT_CONNECTION_THROTTLE_TOKENS; + + public static final String CONNECTION_THROTTLE_FILL_TIME = "zookeeper.connection_throttle_fill_time"; + public static final int DEFAULT_CONNECTION_THROTTLE_FILL_TIME; + + public static final String CONNECTION_THROTTLE_FILL_COUNT = "zookeeper.connection_throttle_fill_count"; + public static final int DEFAULT_CONNECTION_THROTTLE_FILL_COUNT; + + public static final String CONNECTION_THROTTLE_FREEZE_TIME = "zookeeper.connection_throttle_freeze_time"; + public static final int DEFAULT_CONNECTION_THROTTLE_FREEZE_TIME; + + public static final String CONNECTION_THROTTLE_DROP_INCREASE = "zookeeper.connection_throttle_drop_increase"; + public static final double DEFAULT_CONNECTION_THROTTLE_DROP_INCREASE; + + public static final String CONNECTION_THROTTLE_DROP_DECREASE = "zookeeper.connection_throttle_drop_decrease"; + public static final double DEFAULT_CONNECTION_THROTTLE_DROP_DECREASE; + + public static final String CONNECTION_THROTTLE_DECREASE_RATIO = "zookeeper.connection_throttle_decrease_ratio"; + public static final double DEFAULT_CONNECTION_THROTTLE_DECREASE_RATIO; + + + static { + DEFAULT_CONNECTION_THROTTLE_TOKENS = Integer.getInteger(CONNECTION_THROTTLE_TOKENS, 0); + DEFAULT_CONNECTION_THROTTLE_FILL_TIME = Integer.getInteger(CONNECTION_THROTTLE_FILL_TIME, 1); + DEFAULT_CONNECTION_THROTTLE_FILL_COUNT = Integer.getInteger(CONNECTION_THROTTLE_FILL_COUNT, 1); + + DEFAULT_CONNECTION_THROTTLE_FREEZE_TIME = Integer.getInteger(CONNECTION_THROTTLE_FREEZE_TIME, -1); + DEFAULT_CONNECTION_THROTTLE_DROP_INCREASE = getDoubleProp(CONNECTION_THROTTLE_DROP_INCREASE, 0.02); + DEFAULT_CONNECTION_THROTTLE_DROP_DECREASE = getDoubleProp(CONNECTION_THROTTLE_DROP_DECREASE, 0.002); + DEFAULT_CONNECTION_THROTTLE_DECREASE_RATIO = getDoubleProp(CONNECTION_THROTTLE_DECREASE_RATIO, 0); + } + + /* Varation of Integer.getInteger for real number properties */ + private static double getDoubleProp(String name, double def) { + String val = System.getProperty(name); + if(val != null) { + return Double.parseDouble(val); + } + else { + return def; + } + } + + + public BlueThrottle() { + // Disable throttling by default (maxTokens = 0) + this.maxTokens = DEFAULT_CONNECTION_THROTTLE_TOKENS; + this.fillTime = DEFAULT_CONNECTION_THROTTLE_FILL_TIME; + this.fillCount = DEFAULT_CONNECTION_THROTTLE_FILL_COUNT; + this.tokens = maxTokens; + this.lastTime = Time.currentElapsedTime(); + + // Disable BLUE throttling by default (freezeTime = -1) + this.freezeTime = DEFAULT_CONNECTION_THROTTLE_FREEZE_TIME; + this.lastFreeze = Time.currentElapsedTime(); + this.dropIncrease = DEFAULT_CONNECTION_THROTTLE_DROP_INCREASE; + this.dropDecrease = DEFAULT_CONNECTION_THROTTLE_DROP_DECREASE; + this.decreasePoint = DEFAULT_CONNECTION_THROTTLE_DECREASE_RATIO; + this.drop = 0; + + this.rng = new Random(); + } + + public synchronized void setMaxTokens(int max) { + int deficit = maxTokens - tokens; + maxTokens = max; + tokens = max - deficit; + } + + public synchronized void setFillTime(int time) { + fillTime = time; + } + + public synchronized void setFillCount(int count) { + fillCount = count; + } + + public synchronized void setFreezeTime(int time) { + freezeTime = time; + } + + public synchronized void setDropIncrease(double increase) { + dropIncrease = increase; + } + + public synchronized void setDropDecrease(double decrease) { + dropDecrease = decrease; + } + + public synchronized void setDecreasePoint(double ratio) { + decreasePoint = ratio; + } + + public synchronized int getMaxTokens() { + return maxTokens; + } + + public synchronized int getFillTime() { + return fillTime; + } + + public synchronized int getFillCount() { + return fillCount; + } + + public synchronized int getFreezeTime() { + return freezeTime; + } + + public synchronized double getDropIncrease() { + return dropIncrease; + } + + public synchronized double getDropDecrease() { + return dropDecrease; + } + + public synchronized double getDecreasePoint() { + return decreasePoint; + } + + public synchronized double getDropChance() { + return drop; + } + + public synchronized int getDeficit() { + return maxTokens - tokens; + } + + public synchronized boolean checkLimit(int need) { + // A maxTokens setting of zero disables throttling + if (maxTokens == 0) + return true; + + long now = Time.currentElapsedTime(); + long diff = now - lastTime; + + if (diff > fillTime) { + int refill = (int)(diff * fillCount / fillTime); + tokens = Math.min(tokens + refill, maxTokens); + lastTime = now; + } + + // A freeze time of -1 disables BLUE randomized throttling + if(freezeTime != -1) { + if(!checkBlue(now)) { + return false; + } + } + + if (tokens < need) { + return false; + } + + tokens -= need; + return true; + } + + public synchronized boolean checkBlue(long now) { + int length = maxTokens - tokens; + int limit = maxTokens; + long diff = now - lastFreeze; + long threshold = Math.round(maxTokens * decreasePoint); + + if (diff > freezeTime) { + if((length == limit) && (drop < 1)) { + drop = Math.min(drop + dropIncrease, 1); + } + else if ((length <= threshold) && (drop > 0)) { + drop = Math.max(drop - dropDecrease, 0); + } + lastFreeze = now; + } + + if (rng.nextDouble() < drop) { + return false; + } + return true; + } +}; diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ClientCnxnLimitException.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ClientCnxnLimitException.java new file mode 100644 index 00000000000..38f89959a5e --- /dev/null +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ClientCnxnLimitException.java @@ -0,0 +1,30 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zookeeper.server; + +/** + * Indicates that the number of client connections has exceeded some limit. + * @see org.apache.zookeeper.server.ClientCnxnLimit#checkLimit() + * @see org.apache.zookeeper.server.ClientCnxnLimit#checkLimit(int) + */ +public class ClientCnxnLimitException extends Exception { + public ClientCnxnLimitException() { + super("Connection throttle rejected connection"); + } +} diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/FinalRequestProcessor.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/FinalRequestProcessor.java index 38ebd3fe8a2..07efbdfab52 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/FinalRequestProcessor.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/FinalRequestProcessor.java @@ -505,7 +505,7 @@ public void processRequest(Request request) { // so these values are passed along with the response. GetDataResponse getDataResponse = (GetDataResponse)rsp; Stat stat = null; - if (getDataResponse != null && getDataResponse.getStat() != null) { + if (getDataResponse.getStat() != null) { stat = getDataResponse.getStat(); } cnxn.sendResponse(hdr, rsp, "response", path, stat); diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/NIOServerCnxn.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/NIOServerCnxn.java index c2ab78487a8..f7e382facc4 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/NIOServerCnxn.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/NIOServerCnxn.java @@ -151,7 +151,7 @@ public void sendBuffer(ByteBuffer... buffers) { } /** Read the request payload (everything following the length prefix) */ - private void readPayload() throws IOException, InterruptedException { + private void readPayload() throws IOException, InterruptedException, ClientCnxnLimitException { if (incomingBuffer.remaining() != 0) { // have we read length bytes? int rc = sock.read(incomingBuffer); // sock is non-blocking, so ok if (rc < 0) { @@ -360,6 +360,14 @@ void doIO(SelectionKey k) throws InterruptedException { LOG.warn(e.getMessage()); // expecting close to log session closure close(); + } catch (ClientCnxnLimitException e) { + // Common case exception, print at debug level + ServerMetrics.CONNECTION_REJECTED.add(1); + if (LOG.isDebugEnabled()) { + LOG.debug("Exception causing close of session 0x" + + Long.toHexString(sessionId) + ": " + e.getMessage()); + } + close(); } catch (IOException e) { LOG.warn("Exception causing close of session 0x" + Long.toHexString(sessionId) + ": " + e.getMessage()); @@ -407,7 +415,7 @@ public void enableRecv() { } } - private void readConnectRequest() throws IOException, InterruptedException { + private void readConnectRequest() throws IOException, InterruptedException, ClientCnxnLimitException { if (!isZKServerRunning()) { throw new IOException("ZooKeeperServer not running"); } diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/NIOServerCnxnFactory.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/NIOServerCnxnFactory.java index 4e7e5db45f8..090ee7b2c71 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/NIOServerCnxnFactory.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/NIOServerCnxnFactory.java @@ -310,6 +310,7 @@ private boolean doAccept() { acceptErrorLogger.flush(); } catch (IOException e) { // accept, maxClientCnxns, configureBlocking + ServerMetrics.CONNECTION_REJECTED.add(1); acceptErrorLogger.rateLimitLog( "Error accepting new connection: " + e.getMessage()); fastCloseSock(sc); diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/NettyServerCnxn.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/NettyServerCnxn.java index b6bb343f49d..8b4f70f7156 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/NettyServerCnxn.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/NettyServerCnxn.java @@ -486,6 +486,13 @@ private void receiveMessage(ByteBuf message) { } catch(IOException e) { LOG.warn("Closing connection to " + getRemoteSocketAddress(), e); close(); + } catch(ClientCnxnLimitException e) { + // Common case exception, print at debug level + ServerMetrics.CONNECTION_REJECTED.add(1); + if (LOG.isDebugEnabled()) { + LOG.debug("Closing connection to " + getRemoteSocketAddress(), e); + } + close(); } } diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/NettyServerCnxnFactory.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/NettyServerCnxnFactory.java index d96f56de9f0..e0d55a4fbc1 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/NettyServerCnxnFactory.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/NettyServerCnxnFactory.java @@ -108,6 +108,7 @@ public void channelActive(ChannelHandlerContext ctx) throws Exception { InetAddress addr = ((InetSocketAddress) channel.remoteAddress()) .getAddress(); if (maxClientCnxns > 0 && getClientCnxnCount(addr) >= maxClientCnxns) { + ServerMetrics.CONNECTION_REJECTED.add(1); LOG.warn("Too many connections from {} - max is {}", addr, maxClientCnxns); channel.close(); diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ServerMetrics.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ServerMetrics.java index 3420b88e8b8..dcc04a3344c 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ServerMetrics.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ServerMetrics.java @@ -67,6 +67,10 @@ public enum ServerMetrics { SNAP_COUNT(new SimpleCounter("snap_count")), COMMIT_COUNT(new SimpleCounter("commit_count")), CONNECTION_REQUEST_COUNT(new SimpleCounter("connection_request_count")), + // Connection throttling related + CONNECTION_TOKEN_DEFICIT(new AvgMinMaxCounter("connection_token_deficit")), + CONNECTION_REJECTED(new SimpleCounter("connection_rejected")), + BYTES_RECEIVED_COUNT(new SimpleCounter("bytes_received_count")), RESPONSE_PACKET_CACHE_HITS(new SimpleCounter("response_packet_cache_hits")), diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java index 833c79bab0d..ee0e4c2cb69 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java @@ -164,6 +164,9 @@ protected enum State { LOG.info(INT_BUFFER_STARTING_SIZE_BYTES + " = " + intBufferStartingSizeBytes); } + // Connection throttling + private BlueThrottle connThrottle; + void removeCnxn(ServerCnxn cnxn) { zkDb.removeCnxn(cnxn); } @@ -196,7 +199,11 @@ public ZooKeeperServer(FileTxnSnapLog txnLogFactory, int tickTime, setMinSessionTimeout(minSessionTimeout); setMaxSessionTimeout(maxSessionTimeout); listener = new ZooKeeperServerListenerImpl(this); + readResponseCache = new ResponseCache(); + + connThrottle = new BlueThrottle(); + LOG.info("Created server with tickTime " + tickTime + " minSessionTimeout " + getMinSessionTimeout() + " maxSessionTimeout " + getMaxSessionTimeout() @@ -219,6 +226,10 @@ public ServerStats serverStats() { return serverStats; } + public BlueThrottle connThrottle() { + return connThrottle; + } + public void dumpConf(PrintWriter pwriter) { pwriter.print("clientPort="); pwriter.println(getClientPort()); @@ -1043,7 +1054,18 @@ public Map> getEphemerals() { return zkDb.getEphemerals(); } - public void processConnectRequest(ServerCnxn cnxn, ByteBuffer incomingBuffer) throws IOException { + public double getConnectionDropChance() { + return connThrottle.getDropChance(); + } + + public void processConnectRequest(ServerCnxn cnxn, ByteBuffer incomingBuffer) + throws IOException, ClientCnxnLimitException { + + if (connThrottle.checkLimit(1) == false) { + throw new ClientCnxnLimitException(); + } + ServerMetrics.CONNECTION_TOKEN_DEFICIT.add(connThrottle.getDeficit()); + BinaryInputArchive bia = BinaryInputArchive.getArchive(new ByteBufferInputStream(incomingBuffer)); ConnectRequest connReq = new ConnectRequest(); connReq.deserialize(bia, "connect"); diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServerBean.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServerBean.java index deae98d9b40..b8cf70621cb 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServerBean.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServerBean.java @@ -167,7 +167,7 @@ public String getSecureClientPort() { public String getSecureClientAddress() { if (zks.secureServerCnxnFactory != null) { return String.format("%s:%d", zks.secureServerCnxnFactory - .getLocalAddress().getHostString(), + .getLocalAddress().getHostString(), zks.secureServerCnxnFactory.getLocalPort()); } return ""; @@ -207,4 +207,75 @@ public boolean getResponseCachingEnabled() { public void setResponseCachingEnabled(boolean isEnabled) { zks.setResponseCachingEnabled(isEnabled); } + + // Connection throttling settings + /////////////////////////////////////////////////////////////////////////// + + public int getConnectionMaxTokens() { + return zks.connThrottle().getMaxTokens(); + } + + public void setConnectionMaxTokens(int val) { + zks.connThrottle().setMaxTokens(val); + } + + /////////////////////////////////////////////////////////////////////////// + + public int getConnectionTokenFillTime() { + return zks.connThrottle().getFillTime(); + } + + public void setConnectionTokenFillTime(int val) { + zks.connThrottle().setFillTime(val); + } + + /////////////////////////////////////////////////////////////////////////// + + public int getConnectionTokenFillCount() { + return zks.connThrottle().getFillCount(); + } + + public void setConnectionTokenFillCount(int val) { + zks.connThrottle().setFillCount(val); + } + + /////////////////////////////////////////////////////////////////////////// + + public int getConnectionFreezeTime() { + return zks.connThrottle().getFreezeTime(); + } + + public void setConnectionFreezeTime(int val) { + zks.connThrottle().setFreezeTime(val); + } + + /////////////////////////////////////////////////////////////////////////// + + public double getConnectionDropIncrease() { + return zks.connThrottle().getDropIncrease(); + } + + public void setConnectionDropIncrease(double val) { + zks.connThrottle().setDropIncrease(val); + } + + /////////////////////////////////////////////////////////////////////////// + + public double getConnectionDropDecrease() { + return zks.connThrottle().getDropDecrease(); + } + + public void setConnectionDropDecrease(double val) { + zks.connThrottle().setDropDecrease(val); + } + + /////////////////////////////////////////////////////////////////////////// + + public double getConnectionDecreaseRatio() { + return zks.connThrottle().getDecreasePoint(); + } + + public void setConnectionDecreaseRatio(double val) { + zks.connThrottle().setDecreasePoint(val); + } } diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServerMXBean.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServerMXBean.java index bd4d3498d2e..91c8c82bd42 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServerMXBean.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServerMXBean.java @@ -98,6 +98,28 @@ public interface ZooKeeperServerMXBean { public boolean getResponseCachingEnabled(); public void setResponseCachingEnabled(boolean isEnabled); + /* Connection throttling settings */ + public int getConnectionMaxTokens(); + public void setConnectionMaxTokens(int val); + + public int getConnectionTokenFillTime(); + public void setConnectionTokenFillTime(int val); + + public int getConnectionTokenFillCount(); + public void setConnectionTokenFillCount(int val); + + public int getConnectionFreezeTime(); + public void setConnectionFreezeTime(int val); + + public double getConnectionDropIncrease(); + public void setConnectionDropIncrease(double val); + + public double getConnectionDropDecrease(); + public void setConnectionDropDecrease(double val); + + public double getConnectionDecreaseRatio(); + public void setConnectionDecreaseRatio(double val); + /** * Reset packet and latency statistics */ diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/admin/Commands.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/admin/Commands.java index f1e5500563c..b0fa4ff010d 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/admin/Commands.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/admin/Commands.java @@ -354,6 +354,7 @@ public CommandResponse run(ZooKeeperServer zkServer, Map kwargs) OSMXBean osMbean = new OSMXBean(); response.put("open_file_descriptor_count", osMbean.getOpenFileDescriptorCount()); response.put("max_file_descriptor_count", osMbean.getMaxFileDescriptorCount()); + response.put("connection_drop_probability", zkServer.getConnectionDropChance()); response.put("last_client_response_size", stats.getClientResponseStats().getLastBufferSize()); response.put("max_client_response_size", stats.getClientResponseStats().getMaxBufferSize()); diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/FastLeaderElection.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/FastLeaderElection.java index 84e269c3923..439f5b05d7f 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/FastLeaderElection.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/FastLeaderElection.java @@ -68,7 +68,34 @@ public class FastLeaderElection implements Election { * the system up again after long partitions. Currently 60 seconds. */ - final static int maxNotificationInterval = 60000; + private static int maxNotificationInterval = 60000; + + /** + * Lower bound for notification check. The observer don't need to use + * the same lower bound as participant members + */ + private static int minNotificationInterval = finalizeWait; + + /** + * Minimum notification interval, default is equal to finalizeWait + */ + public static final String MIN_NOTIFICATION_INTERVAL = + "zookeeper.fastleader.minNotificationInterval"; + + /** + * Maximum notification interval, default is 60s + */ + public static final String MAX_NOTIFICATION_INTERVAL = + "zookeeper.fastleader.maxNotificationInterval"; + + static { + minNotificationInterval = Integer.getInteger(MIN_NOTIFICATION_INTERVAL, + minNotificationInterval); + LOG.info("{}={}", MIN_NOTIFICATION_INTERVAL, minNotificationInterval); + maxNotificationInterval = Integer.getInteger(MAX_NOTIFICATION_INTERVAL, + maxNotificationInterval); + LOG.info("{}={}", MAX_NOTIFICATION_INTERVAL, maxNotificationInterval); + } /** * Connection manager. Fast leader election uses TCP for @@ -684,16 +711,12 @@ private void sendNotifications() { } } - private void printNotification(Notification n){ - LOG.info("Notification: " - + Long.toHexString(n.version) + " (message format version), " - + n.leader + " (n.leader), 0x" - + Long.toHexString(n.zxid) + " (n.zxid), 0x" - + Long.toHexString(n.electionEpoch) + " (n.round), " + n.state - + " (n.state), " + n.sid + " (n.sid), 0x" - + Long.toHexString(n.peerEpoch) + " (n.peerEPoch), " - + self.getPeerState() + " (my state)" - + (n.qv!=null ? (Long.toHexString(n.qv.getVersion()) + " (n.config version)"):"")); + private void printNotification(Notification n) { + LOG.info("Notification: my state:{}; n.sid:{}, n.state:{}, n.leader:{}, n.round:0x{}, " + + "n.peerEpoch:0x{}, n.zxid:0x{}, message format version:0x{}, n.config version:0x{}", + self.getPeerState(), n.sid, n.state, n.leader, Long.toHexString(n.electionEpoch), + Long.toHexString(n.peerEpoch), Long.toHexString(n.zxid), Long.toHexString(n.version), + (n.qv != null ? (Long.toHexString(n.qv.getVersion())) : "0")); } @@ -898,7 +921,7 @@ public Vote lookForLeader() throws InterruptedException { Map outofelection = new HashMap(); - int notTimeout = finalizeWait; + int notTimeout = minNotificationInterval; synchronized(this){ logicalclock.incrementAndGet(); diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/common/X509UtilTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/common/X509UtilTest.java index 9c4a9b0f0b9..2a6bb3246f5 100644 --- a/zookeeper-server/src/test/java/org/apache/zookeeper/common/X509UtilTest.java +++ b/zookeeper-server/src/test/java/org/apache/zookeeper/common/X509UtilTest.java @@ -482,6 +482,46 @@ public void handshakeCompleted(HandshakeCompletedEvent handshakeCompletedEvent) } } + @Test + public void testGetDefaultCipherSuitesJava8() { + String[] cipherSuites = X509Util.getDefaultCipherSuitesForJavaVersion("1.8"); + // Java 8 default should have the CBC suites first + Assert.assertTrue(cipherSuites[0].contains("CBC")); + } + + @Test + public void testGetDefaultCipherSuitesJava9() { + String[] cipherSuites = X509Util.getDefaultCipherSuitesForJavaVersion("9"); + // Java 9+ default should have the GCM suites first + Assert.assertTrue(cipherSuites[0].contains("GCM")); + } + + @Test + public void testGetDefaultCipherSuitesJava10() { + String[] cipherSuites = X509Util.getDefaultCipherSuitesForJavaVersion("10"); + // Java 9+ default should have the GCM suites first + Assert.assertTrue(cipherSuites[0].contains("GCM")); + } + + @Test + public void testGetDefaultCipherSuitesJava11() { + String[] cipherSuites = X509Util.getDefaultCipherSuitesForJavaVersion("11"); + // Java 9+ default should have the GCM suites first + Assert.assertTrue(cipherSuites[0].contains("GCM")); + } + + @Test + public void testGetDefaultCipherSuitesUnknownVersion() { + String[] cipherSuites = X509Util.getDefaultCipherSuitesForJavaVersion("notaversion"); + // If version can't be parsed, use the more conservative Java 8 default + Assert.assertTrue(cipherSuites[0].contains("CBC")); + } + + @Test(expected = NullPointerException.class) + public void testGetDefaultCipherSuitesNullVersion() { + X509Util.getDefaultCipherSuitesForJavaVersion(null); + } + // Warning: this will reset the x509Util private void setCustomCipherSuites() { System.setProperty(x509Util.getCipherSuitesProperty(), customCipherSuites[0] + "," + customCipherSuites[1]); diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/server/BlueThrottleTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/server/BlueThrottleTest.java new file mode 100644 index 00000000000..aa27a15f7f4 --- /dev/null +++ b/zookeeper-server/src/test/java/org/apache/zookeeper/server/BlueThrottleTest.java @@ -0,0 +1,157 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.zookeeper.server; + +import org.apache.zookeeper.ZKTestCase; +import org.junit.Assert; +import org.junit.Rule; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Random; + +public class BlueThrottleTest extends ZKTestCase { + private static final Logger LOG = LoggerFactory.getLogger(BlueThrottleTest.class); + + class MockRandom extends Random { + int flag = 0; + BlueThrottle throttle; + + @Override + public double nextDouble() { + if (throttle.getDropChance() > 0) { + flag = 1 - flag; + return flag; + } else { + return 1; + } + } + } + + class BlueThrottleWithMockRandom extends BlueThrottle { + public BlueThrottleWithMockRandom(MockRandom random) { + super(); + this.rng = random; + random.throttle = this; + } + } + + @Test + public void testThrottleDisabled() { + BlueThrottle throttler = new BlueThrottle(); + Assert.assertTrue("Throttle should be disabled by default", throttler.checkLimit(1)); + } + + @Test + public void testThrottleWithoutRefill() { + BlueThrottle throttler = new BlueThrottle(); + throttler.setMaxTokens(1); + throttler.setFillTime(2000); + Assert.assertTrue("First request should be allowed", throttler.checkLimit(1)); + Assert.assertFalse("Second request should be denied", throttler.checkLimit(1)); + } + + @Test + public void testThrottleWithRefill() throws InterruptedException { + BlueThrottle throttler = new BlueThrottle(); + throttler.setMaxTokens(1); + throttler.setFillTime(500); + Assert.assertTrue("First request should be allowed", throttler.checkLimit(1)); + Assert.assertFalse("Second request should be denied", throttler.checkLimit(1)); + + //wait for the bucket to be refilled + Thread.sleep(750); + Assert.assertTrue("Third request should be allowed since we've got a new token", throttler.checkLimit(1)); + } + + @Test + public void testThrottleWithoutRandomDropping() throws InterruptedException { + int maxTokens = 5; + BlueThrottle throttler = new BlueThrottleWithMockRandom(new MockRandom()); + throttler.setMaxTokens(maxTokens); + throttler.setFillCount(maxTokens); + throttler.setFillTime(1000); + + for (int i=0;i0); + + //allow bucket to be refilled + Thread.sleep(1500); + + for (int i=0;i0); + LOG.info("Dropping probability is {}", throttler.getDropChance()); + + //allow bucket to be refilled + Thread.sleep(1100); + LOG.info("Bucket is refilled with {} tokens.", maxTokens); + + int accepted = 0; + for (int i=0;i 0); + } +} diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/server/admin/CommandsTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/server/admin/CommandsTest.java index 9b30c555dc0..000b3cea413 100644 --- a/zookeeper-server/src/test/java/org/apache/zookeeper/server/admin/CommandsTest.java +++ b/zookeeper-server/src/test/java/org/apache/zookeeper/server/admin/CommandsTest.java @@ -190,7 +190,8 @@ public void testMonitor() throws IOException, InterruptedException { new Field("min_client_response_size", Integer.class), new Field("uptime", Long.class), new Field("global_sessions", Long.class), - new Field("local_sessions", Long.class) + new Field("local_sessions", Long.class), + new Field("connection_drop_probability", Double.class) )); for (String metric : ServerMetrics.getAllValues().keySet()) { if (metric.startsWith("avg_")) {