From 0595ff62749d52896f2f813bbb14951e37715ebb Mon Sep 17 00:00:00 2001 From: Mark Paluch Date: Fri, 31 Jul 2015 14:34:44 +0200 Subject: [PATCH] Add close stale connections and strict cluster member check flags to ClusterClientOptions #109 Add two flags to ClusterClientOptions: closeStaleConnections: Close stale connections when refreshing the cluster topology Motivation: Connections to nodes, which do not belong to the cluster (anymore) are closed as soon as the cluster topology changes. If one node is no longer part of the cluster, the connections to the node can be closed. One might want to prevent that behavior because one might want still to communicate with the other nodes that come into play when using validateClusterNodeMembership = false validateClusterNodeMembership: Validate the cluster node membership before allowing connections to that node Motivation: The current implementation performs redirects using MOVED and ASK and allows obtaining connections to the particular cluster nodes. The validation was introduced during the development of version 3.3 to prevent security breaches and only allow connections to the known hosts of the CLUSTER NODES output. There are some scenarios, where the strict validation is an obstruction: MOVED/ASK redirection but the cluster topology view is stale Connecting to cluster nodes using different IP's/hostnames (e.g. private/public IP's) Connecting to non-cluster members to reconfigure those while using the RedisClusterClient connection. --- .../redis/cluster/ClusterClientOptions.java | 68 +++++++++++++++++-- .../PooledClusterConnectionProvider.java | 37 ++++++---- .../redis/cluster/RedisClusterClient.java | 60 ++++++++-------- .../cluster/AdvancedClusterClientTest.java | 21 +++--- .../redis/cluster/RedisClusterSetupTest.java | 37 ++++++++++ 5 files changed, 163 insertions(+), 60 deletions(-) diff --git a/src/main/java/com/lambdaworks/redis/cluster/ClusterClientOptions.java b/src/main/java/com/lambdaworks/redis/cluster/ClusterClientOptions.java index f314b5b75d..b4f0914ab4 100644 --- a/src/main/java/com/lambdaworks/redis/cluster/ClusterClientOptions.java +++ b/src/main/java/com/lambdaworks/redis/cluster/ClusterClientOptions.java @@ -13,9 +13,11 @@ public class ClusterClientOptions extends ClientOptions { private final boolean refreshClusterView; private final long refreshPeriod; private final TimeUnit refreshPeriodUnit; + private final boolean closeStaleConnections; + private final boolean validateClusterNodeMembership; /** - * Create a copy of {@literal options} + * Create a copy of {@literal options}. * * @param options the original * @return A new instance of {@link ClusterClientOptions} containing the values of {@literal options} @@ -32,6 +34,8 @@ public static class Builder extends ClientOptions.Builder { private boolean refreshClusterView = false; private long refreshPeriod = 60; private TimeUnit refreshPeriodUnit = TimeUnit.SECONDS; + private boolean closeStaleConnections = true; + private boolean validateClusterNodeMembership = true; /** * Enable regular cluster topology updates. The client starts updating the cluster topology in the intervals of @@ -47,7 +51,7 @@ public Builder refreshClusterView(boolean refreshClusterView) { } /** - * Set the refresh period. Defaults to {@literal 60 SECONDS} + * Set the refresh period. Defaults to {@literal 60 SECONDS}. * * @param refreshPeriod period for triggering topology updates * @param refreshPeriodUnit unit for {@code refreshPeriod} @@ -59,6 +63,29 @@ public Builder refreshPeriod(long refreshPeriod, TimeUnit refreshPeriodUnit) { return this; } + /** + * Flag, whether to close stale connections when refreshing the cluster topology. Defaults to {@literal true}. Comes + * only into effect if {@link #isRefreshClusterView()} is {@literal true}. + * + * @param closeStaleConnections {@literal true} if stale connections are cleaned up after cluster topology updates + * @return {@code this} + */ + public Builder closeStaleConnections(boolean closeStaleConnections) { + this.closeStaleConnections = closeStaleConnections; + return this; + } + + /** + * Validate the cluster node membership before allowing connections to a cluster node. Defaults to {@literal true}. + * + * @param validateClusterNodeMembership {@literal true} if validation is enabled. + * @return {@code this} + */ + public Builder validateClusterNodeMembership(boolean validateClusterNodeMembership) { + this.validateClusterNodeMembership = validateClusterNodeMembership; + return this; + } + /** * Create a new instance of {@link ClusterClientOptions} * @@ -74,6 +101,8 @@ protected ClusterClientOptions(Builder builder) { this.refreshClusterView = builder.refreshClusterView; this.refreshPeriod = builder.refreshPeriod; this.refreshPeriodUnit = builder.refreshPeriodUnit; + this.closeStaleConnections = builder.closeStaleConnections; + this.validateClusterNodeMembership = builder.validateClusterNodeMembership; } protected ClusterClientOptions(ClusterClientOptions original) { @@ -81,37 +110,62 @@ protected ClusterClientOptions(ClusterClientOptions original) { this.refreshClusterView = original.refreshClusterView; this.refreshPeriod = original.refreshPeriod; this.refreshPeriodUnit = original.refreshPeriodUnit; + this.closeStaleConnections = original.closeStaleConnections; + this.validateClusterNodeMembership = original.validateClusterNodeMembership; } protected ClusterClientOptions() { this.refreshClusterView = false; this.refreshPeriod = 60; this.refreshPeriodUnit = TimeUnit.SECONDS; + this.closeStaleConnections = true; + this.validateClusterNodeMembership = true; } /** * Flag, whether regular cluster topology updates are updated. The client starts updating the cluster topology in the - * intervals of {@link #getRefreshPeriod()} /{@link #getRefreshPeriodUnit()}. Defaults to {@literal false} + * intervals of {@link #getRefreshPeriod()} /{@link #getRefreshPeriodUnit()}. Defaults to {@literal false}. * - * @return + * @return {@literal true} it the cluster topology view is updated periodically */ public boolean isRefreshClusterView() { return refreshClusterView; } /** - * - * @return the period between the regular cluster topology updates. + * Period between the regular cluster topology updates. Defaults to {@literal 60}. + * + * @return the period between the regular cluster topology updates */ public long getRefreshPeriod() { return refreshPeriod; } /** - * + * Unit for the {@link #getRefreshPeriod()}. Defaults to {@link TimeUnit#SECONDS}. + * * @return unit for the {@link #getRefreshPeriod()} */ public TimeUnit getRefreshPeriodUnit() { return refreshPeriodUnit; } + + /** + * Flag, whether to close stale connections when refreshing the cluster topology. Defaults to {@literal true}. Comes only + * into effect if {@link #isRefreshClusterView()} is {@literal true}. + * + * @return {@literal true} if stale connections are cleaned up after cluster topology updates + */ + public boolean isCloseStaleConnections() { + return closeStaleConnections; + } + + /** + * Validate the cluster node membership before allowing connections to a cluster node. Defaults to {@literal true}. + * + * @return {@literal true} if validation is enabled. + */ + public boolean isValidateClusterNodeMembership() { + return validateClusterNodeMembership; + } } diff --git a/src/main/java/com/lambdaworks/redis/cluster/PooledClusterConnectionProvider.java b/src/main/java/com/lambdaworks/redis/cluster/PooledClusterConnectionProvider.java index 3925e98538..d6bc6ad4d5 100644 --- a/src/main/java/com/lambdaworks/redis/cluster/PooledClusterConnectionProvider.java +++ b/src/main/java/com/lambdaworks/redis/cluster/PooledClusterConnectionProvider.java @@ -15,11 +15,7 @@ import com.google.common.collect.Sets; import com.google.common.net.HostAndPort; import com.google.common.util.concurrent.UncheckedExecutionException; -import com.lambdaworks.redis.RedisAsyncConnection; -import com.lambdaworks.redis.RedisAsyncConnectionImpl; -import com.lambdaworks.redis.RedisChannelWriter; -import com.lambdaworks.redis.RedisException; -import com.lambdaworks.redis.RedisURI; +import com.lambdaworks.redis.*; import com.lambdaworks.redis.cluster.models.partitions.Partitions; import com.lambdaworks.redis.cluster.models.partitions.RedisClusterNode; import com.lambdaworks.redis.codec.RedisCodec; @@ -43,6 +39,7 @@ class PooledClusterConnectionProvider implements ClusterConnectionProvider private final LoadingCache> connections; private final boolean debugEnabled; private final RedisAsyncConnectionImpl writers[] = new RedisAsyncConnectionImpl[SlotHash.SLOT_COUNT]; + private final RedisClusterClient redisClusterClient; private Partitions partitions; private boolean autoFlushCommands = true; @@ -50,6 +47,7 @@ class PooledClusterConnectionProvider implements ClusterConnectionProvider public PooledClusterConnectionProvider(RedisClusterClient redisClusterClient, RedisChannelWriter clusterWriter, RedisCodec redisCodec) { + this.redisClusterClient = redisClusterClient; this.debugEnabled = logger.isDebugEnabled(); this.connections = CacheBuilder.newBuilder().build( new ConnectionFactory(redisClusterClient, redisCodec, clusterWriter)); @@ -108,11 +106,13 @@ public RedisAsyncConnectionImpl getConnection(Intent intent, String host, logger.debug("getConnection(" + intent + ", " + host + ", " + port + ")"); } - RedisClusterNode redisClusterNode = getPartition(host, port); + if (validateClusterNodeMembership()) { + RedisClusterNode redisClusterNode = getPartition(host, port); - if (redisClusterNode == null) { - HostAndPort hostAndPort = HostAndPort.fromParts(host, port); - throw invalidConnectionPoint(hostAndPort.toString()); + if (redisClusterNode == null) { + HostAndPort hostAndPort = HostAndPort.fromParts(host, port); + throw invalidConnectionPoint(hostAndPort.toString()); + } } ConnectionKey key = new ConnectionKey(intent, host, port); @@ -181,10 +181,8 @@ private void reconfigurePartitions() { resetWriterCache(); - for (ConnectionKey key : staleConnections) { - RedisAsyncConnectionImpl connection = connections.getIfPresent(key); - connection.close(); - connections.invalidate(key); + if (redisClusterClient.expireStaleConnections()) { + closeStaleConnections(); } } @@ -352,6 +350,11 @@ public int hashCode() { } } + private boolean validateClusterNodeMembership() { + return redisClusterClient.getClusterClientOptions() == null + || redisClusterClient.getClusterClientOptions().isValidateClusterNodeMembership(); + } + private class ConnectionFactory extends CacheLoader> { private final RedisClusterClient redisClusterClient; @@ -380,8 +383,11 @@ public RedisAsyncConnectionImpl load(ConnectionKey key) throws Exception { } if (key.host != null) { - if (getPartition(key.host, key.port) == null) { - throw invalidConnectionPoint(key.host + ":" + key.port); + + if (validateClusterNodeMembership()) { + if (getPartition(key.host, key.port) == null) { + throw invalidConnectionPoint(key.host + ":" + key.port); + } } // Host and port connections do not provide command recovery due to cluster reconfiguration @@ -394,5 +400,6 @@ public RedisAsyncConnectionImpl load(ConnectionKey key) throws Exception { } return connection; } + } } diff --git a/src/main/java/com/lambdaworks/redis/cluster/RedisClusterClient.java b/src/main/java/com/lambdaworks/redis/cluster/RedisClusterClient.java index fe4f7e374f..9951b1f923 100644 --- a/src/main/java/com/lambdaworks/redis/cluster/RedisClusterClient.java +++ b/src/main/java/com/lambdaworks/redis/cluster/RedisClusterClient.java @@ -1,30 +1,18 @@ package com.lambdaworks.redis.cluster; -import static com.google.common.base.Preconditions.checkArgument; -import static com.google.common.base.Preconditions.checkNotNull; -import static com.google.common.base.Preconditions.checkState; +import static com.google.common.base.Preconditions.*; import static com.lambdaworks.redis.cluster.ClusterTopologyRefresh.RedisUriComparator.INSTANCE; import java.io.Closeable; import java.net.SocketAddress; -import java.util.ArrayDeque; -import java.util.Collections; -import java.util.Comparator; -import java.util.List; -import java.util.Map; -import java.util.Queue; +import java.util.*; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import com.google.common.base.Predicate; import com.google.common.base.Supplier; import com.google.common.collect.Lists; -import com.lambdaworks.redis.AbstractRedisClient; -import com.lambdaworks.redis.RedisAsyncConnectionImpl; -import com.lambdaworks.redis.RedisChannelWriter; -import com.lambdaworks.redis.RedisClusterConnection; -import com.lambdaworks.redis.RedisException; -import com.lambdaworks.redis.RedisURI; +import com.lambdaworks.redis.*; import com.lambdaworks.redis.cluster.models.partitions.Partitions; import com.lambdaworks.redis.cluster.models.partitions.RedisClusterNode; import com.lambdaworks.redis.codec.RedisCodec; @@ -420,12 +408,19 @@ protected void forEachCloseable(Predicate>() { - @Override - public boolean apply(RedisAdvancedClusterAsyncConnectionImpl input) { - - ClusterDistributionChannelWriter writer = (ClusterDistributionChannelWriter) input - .getChannelWriter(); - writer.getClusterConnectionProvider().closeStaleConnections(); - return true; - } - }); + if (isEventLoopActive() && expireStaleConnections()) { + forEachClusterConnection(new Predicate>() { + @Override + public boolean apply(RedisAdvancedClusterAsyncConnectionImpl input) { + + ClusterDistributionChannelWriter writer = (ClusterDistributionChannelWriter) input + .getChannelWriter(); + writer.getClusterConnectionProvider().closeStaleConnections(); + return true; + } + }); + } } } + + boolean expireStaleConnections() { + return getClusterClientOptions() == null || getClusterClientOptions().isCloseStaleConnections(); + } } diff --git a/src/test/java/com/lambdaworks/redis/cluster/AdvancedClusterClientTest.java b/src/test/java/com/lambdaworks/redis/cluster/AdvancedClusterClientTest.java index 6a4fb001ad..2170892958 100644 --- a/src/test/java/com/lambdaworks/redis/cluster/AdvancedClusterClientTest.java +++ b/src/test/java/com/lambdaworks/redis/cluster/AdvancedClusterClientTest.java @@ -7,6 +7,7 @@ import java.util.List; import java.util.concurrent.TimeUnit; +import com.lambdaworks.redis.*; import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -17,12 +18,6 @@ import com.google.code.tempusfugit.temporal.WaitFor; import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; -import com.lambdaworks.redis.LettuceFutures; -import com.lambdaworks.redis.RedisClusterAsyncConnection; -import com.lambdaworks.redis.RedisClusterConnection; -import com.lambdaworks.redis.RedisException; -import com.lambdaworks.redis.RedisFuture; -import com.lambdaworks.redis.RedisURI; import com.lambdaworks.redis.cluster.models.partitions.Partitions; import com.lambdaworks.redis.cluster.models.partitions.RedisClusterNode; @@ -161,17 +156,27 @@ public void forbiddenHostOnRedirect() throws Exception { } @Test - public void getConnectionToNotAClusterMember() throws Exception { + public void getConnectionToNotAClusterMemberForbidden() throws Exception { RedisAdvancedClusterConnection sync = clusterClient.connectCluster(); try { - sync.getConnection("8.8.8.8", 1234); + sync.getConnection(TestSettings.host(), TestSettings.port()); } catch (RedisException e) { assertThat(e).hasRootCauseExactlyInstanceOf(IllegalArgumentException.class); } sync.close(); } + + @Test + public void getConnectionToNotAClusterMemberAllowed() throws Exception { + + clusterClient.setOptions(new ClusterClientOptions.Builder().validateClusterNodeMembership(false).build()); + RedisAdvancedClusterConnection sync = clusterClient.connectCluster(); + sync.getConnection(TestSettings.host(), TestSettings.port()); + sync.close(); + } + @Test public void pipelining() throws Exception { diff --git a/src/test/java/com/lambdaworks/redis/cluster/RedisClusterSetupTest.java b/src/test/java/com/lambdaworks/redis/cluster/RedisClusterSetupTest.java index 7c8e171dca..97771ee214 100644 --- a/src/test/java/com/lambdaworks/redis/cluster/RedisClusterSetupTest.java +++ b/src/test/java/com/lambdaworks/redis/cluster/RedisClusterSetupTest.java @@ -419,6 +419,43 @@ public boolean isSatisfied() { } + @Test + public void doNotExpireStaleNodeIdConnections() throws Exception { + + clusterClient.setOptions(new ClusterClientOptions.Builder().refreshClusterView(true).closeStaleConnections(false).refreshPeriod(1, TimeUnit.SECONDS) + .build()); + RedisAdvancedClusterAsyncConnection clusterConnection = clusterClient.connectClusterAsync(); + + setup2Masters(); + + PooledClusterConnectionProvider clusterConnectionProvider = getPooledClusterConnectionProvider(clusterConnection); + + assertThat(clusterConnectionProvider.getConnectionCount()).isEqualTo(0); + + assertRoutedExecution(clusterConnection); + + assertThat(clusterConnectionProvider.getConnectionCount()).isEqualTo(2); + + Partitions partitions = ClusterPartitionParser.parse(redis1.clusterNodes()); + for (RedisClusterNode redisClusterNode : partitions.getPartitions()) { + if (!redisClusterNode.getFlags().contains(RedisClusterNode.NodeFlag.MYSELF)) { + redis1.clusterForget(redisClusterNode.getNodeId()); + } + } + + partitions = ClusterPartitionParser.parse(redis2.clusterNodes()); + for (RedisClusterNode redisClusterNode : partitions.getPartitions()) { + if (!redisClusterNode.getFlags().contains(RedisClusterNode.NodeFlag.MYSELF)) { + redis2.clusterForget(redisClusterNode.getNodeId()); + } + } + + Thread.sleep(2000); + + assertThat(clusterConnectionProvider.getConnectionCount()).isEqualTo(2); + + } + @Test public void expireStaleHostAndPortConnections() throws Exception {