From bb191e1460f7ce99541b1da24047c72979544e68 Mon Sep 17 00:00:00 2001 From: Mark Paluch Date: Fri, 31 Jul 2015 14:34:44 +0200 Subject: [PATCH] Add close stale connections and strict cluster member check flags to ClusterClientOptions #109 Add two flags to ClusterClientOptions: closeStaleConnections: Close stale connections when refreshing the cluster topology Motivation: Connections to nodes, which do not belong to the cluster (anymore) are closed as soon as the cluster topology changes. If one node is no longer part of the cluster, the connections to the node can be closed. One might want to prevent that behavior because one might want still to communicate with the other nodes that come into play when using validateClusterNodeMembership = false validateClusterNodeMembership: Validate the cluster node membership before allowing connections to that node Motivation: The current implementation performs redirects using MOVED and ASK and allows obtaining connections to the particular cluster nodes. The validation was introduced during the development of version 3.3 to prevent security breaches and only allow connections to the known hosts of the CLUSTER NODES output. There are some scenarios, where the strict validation is an obstruction: MOVED/ASK redirection but the cluster topology view is stale Connecting to cluster nodes using different IP's/hostnames (e.g. private/public IP's) Connecting to non-cluster members to reconfigure those while using the RedisClusterClient connection. --- .../redis/cluster/ClusterClientOptions.java | 68 ++++++++++++++++-- .../PooledClusterConnectionProvider.java | 32 ++++++--- .../redis/cluster/RedisClusterClient.java | 36 ++++++---- .../cluster/AdvancedClusterClientTest.java | 14 +++- .../redis/cluster/RedisClusterSetupTest.java | 71 ++++++++++++++++++- 5 files changed, 188 insertions(+), 33 deletions(-) diff --git a/src/main/java/com/lambdaworks/redis/cluster/ClusterClientOptions.java b/src/main/java/com/lambdaworks/redis/cluster/ClusterClientOptions.java index f314b5b75d..b4f0914ab4 100644 --- a/src/main/java/com/lambdaworks/redis/cluster/ClusterClientOptions.java +++ b/src/main/java/com/lambdaworks/redis/cluster/ClusterClientOptions.java @@ -13,9 +13,11 @@ public class ClusterClientOptions extends ClientOptions { private final boolean refreshClusterView; private final long refreshPeriod; private final TimeUnit refreshPeriodUnit; + private final boolean closeStaleConnections; + private final boolean validateClusterNodeMembership; /** - * Create a copy of {@literal options} + * Create a copy of {@literal options}. * * @param options the original * @return A new instance of {@link ClusterClientOptions} containing the values of {@literal options} @@ -32,6 +34,8 @@ public static class Builder extends ClientOptions.Builder { private boolean refreshClusterView = false; private long refreshPeriod = 60; private TimeUnit refreshPeriodUnit = TimeUnit.SECONDS; + private boolean closeStaleConnections = true; + private boolean validateClusterNodeMembership = true; /** * Enable regular cluster topology updates. The client starts updating the cluster topology in the intervals of @@ -47,7 +51,7 @@ public Builder refreshClusterView(boolean refreshClusterView) { } /** - * Set the refresh period. Defaults to {@literal 60 SECONDS} + * Set the refresh period. Defaults to {@literal 60 SECONDS}. * * @param refreshPeriod period for triggering topology updates * @param refreshPeriodUnit unit for {@code refreshPeriod} @@ -59,6 +63,29 @@ public Builder refreshPeriod(long refreshPeriod, TimeUnit refreshPeriodUnit) { return this; } + /** + * Flag, whether to close stale connections when refreshing the cluster topology. Defaults to {@literal true}. Comes + * only into effect if {@link #isRefreshClusterView()} is {@literal true}. + * + * @param closeStaleConnections {@literal true} if stale connections are cleaned up after cluster topology updates + * @return {@code this} + */ + public Builder closeStaleConnections(boolean closeStaleConnections) { + this.closeStaleConnections = closeStaleConnections; + return this; + } + + /** + * Validate the cluster node membership before allowing connections to a cluster node. Defaults to {@literal true}. + * + * @param validateClusterNodeMembership {@literal true} if validation is enabled. + * @return {@code this} + */ + public Builder validateClusterNodeMembership(boolean validateClusterNodeMembership) { + this.validateClusterNodeMembership = validateClusterNodeMembership; + return this; + } + /** * Create a new instance of {@link ClusterClientOptions} * @@ -74,6 +101,8 @@ protected ClusterClientOptions(Builder builder) { this.refreshClusterView = builder.refreshClusterView; this.refreshPeriod = builder.refreshPeriod; this.refreshPeriodUnit = builder.refreshPeriodUnit; + this.closeStaleConnections = builder.closeStaleConnections; + this.validateClusterNodeMembership = builder.validateClusterNodeMembership; } protected ClusterClientOptions(ClusterClientOptions original) { @@ -81,37 +110,62 @@ protected ClusterClientOptions(ClusterClientOptions original) { this.refreshClusterView = original.refreshClusterView; this.refreshPeriod = original.refreshPeriod; this.refreshPeriodUnit = original.refreshPeriodUnit; + this.closeStaleConnections = original.closeStaleConnections; + this.validateClusterNodeMembership = original.validateClusterNodeMembership; } protected ClusterClientOptions() { this.refreshClusterView = false; this.refreshPeriod = 60; this.refreshPeriodUnit = TimeUnit.SECONDS; + this.closeStaleConnections = true; + this.validateClusterNodeMembership = true; } /** * Flag, whether regular cluster topology updates are updated. The client starts updating the cluster topology in the - * intervals of {@link #getRefreshPeriod()} /{@link #getRefreshPeriodUnit()}. Defaults to {@literal false} + * intervals of {@link #getRefreshPeriod()} /{@link #getRefreshPeriodUnit()}. Defaults to {@literal false}. * - * @return + * @return {@literal true} it the cluster topology view is updated periodically */ public boolean isRefreshClusterView() { return refreshClusterView; } /** - * - * @return the period between the regular cluster topology updates. + * Period between the regular cluster topology updates. Defaults to {@literal 60}. + * + * @return the period between the regular cluster topology updates */ public long getRefreshPeriod() { return refreshPeriod; } /** - * + * Unit for the {@link #getRefreshPeriod()}. Defaults to {@link TimeUnit#SECONDS}. + * * @return unit for the {@link #getRefreshPeriod()} */ public TimeUnit getRefreshPeriodUnit() { return refreshPeriodUnit; } + + /** + * Flag, whether to close stale connections when refreshing the cluster topology. Defaults to {@literal true}. Comes only + * into effect if {@link #isRefreshClusterView()} is {@literal true}. + * + * @return {@literal true} if stale connections are cleaned up after cluster topology updates + */ + public boolean isCloseStaleConnections() { + return closeStaleConnections; + } + + /** + * Validate the cluster node membership before allowing connections to a cluster node. Defaults to {@literal true}. + * + * @return {@literal true} if validation is enabled. + */ + public boolean isValidateClusterNodeMembership() { + return validateClusterNodeMembership; + } } diff --git a/src/main/java/com/lambdaworks/redis/cluster/PooledClusterConnectionProvider.java b/src/main/java/com/lambdaworks/redis/cluster/PooledClusterConnectionProvider.java index 762fa94758..a418887575 100644 --- a/src/main/java/com/lambdaworks/redis/cluster/PooledClusterConnectionProvider.java +++ b/src/main/java/com/lambdaworks/redis/cluster/PooledClusterConnectionProvider.java @@ -35,6 +35,7 @@ * @author Mark Paluch * @since 3.0 */ +@SuppressWarnings({ "unchecked", "rawtypes" }) class PooledClusterConnectionProvider implements ClusterConnectionProvider { private static final InternalLogger logger = InternalLoggerFactory.getInstance(PooledClusterConnectionProvider.class); @@ -42,6 +43,7 @@ class PooledClusterConnectionProvider implements ClusterConnectionProvider private final LoadingCache> connections; private final boolean debugEnabled; private final StatefulRedisConnection writers[] = new StatefulRedisConnection[SlotHash.SLOT_COUNT]; + private final RedisClusterClient redisClusterClient; private Partitions partitions; private boolean autoFlushCommands = true; @@ -49,6 +51,7 @@ class PooledClusterConnectionProvider implements ClusterConnectionProvider public PooledClusterConnectionProvider(RedisClusterClient redisClusterClient, RedisChannelWriter clusterWriter, RedisCodec redisCodec) { + this.redisClusterClient = redisClusterClient; this.debugEnabled = logger.isDebugEnabled(); this.connections = CacheBuilder.newBuilder().build( new ConnectionFactory(redisClusterClient, redisCodec, clusterWriter)); @@ -110,11 +113,13 @@ public StatefulRedisConnection getConnection(Intent intent, String host, i logger.debug("getConnection(" + intent + ", " + host + ", " + port + ")"); } - RedisClusterNode redisClusterNode = getPartition(host, port); + if (validateClusterNodeMembership()) { + RedisClusterNode redisClusterNode = getPartition(host, port); - if (redisClusterNode == null) { - HostAndPort hostAndPort = HostAndPort.fromParts(host, port); - throw invalidConnectionPoint(hostAndPort.toString()); + if (redisClusterNode == null) { + HostAndPort hostAndPort = HostAndPort.fromParts(host, port); + throw invalidConnectionPoint(hostAndPort.toString()); + } } ConnectionKey key = new ConnectionKey(intent, host, port); @@ -186,10 +191,8 @@ private void reconfigurePartitions() { resetWriterCache(); - for (ConnectionKey key : staleConnections) { - StatefulRedisConnection connection = connections.getIfPresent(key); - connection.close(); - connections.invalidate(key); + if (redisClusterClient.expireStaleConnections()) { + closeStaleConnections(); } } @@ -255,7 +258,6 @@ public void setAutoFlushCommands(boolean autoFlush) { @Override public void flushCommands() { - for (StatefulRedisConnection connection : connections.asMap().values()) { connection.flushCommands(); } @@ -359,6 +361,11 @@ public int hashCode() { } } + private boolean validateClusterNodeMembership() { + return redisClusterClient.getClusterClientOptions() == null + || redisClusterClient.getClusterClientOptions().isValidateClusterNodeMembership(); + } + private class ConnectionFactory extends CacheLoader> { private final RedisClusterClient redisClusterClient; @@ -387,8 +394,11 @@ public StatefulRedisConnection load(ConnectionKey key) throws Exception { } if (key.host != null) { - if (getPartition(key.host, key.port) == null) { - throw invalidConnectionPoint(key.host + ":" + key.port); + + if (validateClusterNodeMembership()) { + if (getPartition(key.host, key.port) == null) { + throw invalidConnectionPoint(key.host + ":" + key.port); + } } // Host and port connections do not provide command recovery due to cluster reconfiguration diff --git a/src/main/java/com/lambdaworks/redis/cluster/RedisClusterClient.java b/src/main/java/com/lambdaworks/redis/cluster/RedisClusterClient.java index 03fafa8ed9..95b768f81a 100644 --- a/src/main/java/com/lambdaworks/redis/cluster/RedisClusterClient.java +++ b/src/main/java/com/lambdaworks/redis/cluster/RedisClusterClient.java @@ -122,8 +122,8 @@ public RedisClusterClient(List initialUris) { this.initialUris = initialUris; checkNotNull(initialUris, "initialUris must not be null"); checkArgument(!initialUris.isEmpty(), "initialUris must not be empty"); - setDefaultTimeout(getFirstUri().getTimeout(), getFirstUri().getUnit()); + setOptions(new ClusterClientOptions.Builder().build()); } /** @@ -442,13 +442,20 @@ protected void forEachCloseable(Predicate { - ClusterDistributionChannelWriter writer = (ClusterDistributionChannelWriter) input - .getChannelWriter(); - writer.getClusterConnectionProvider().closeStaleConnections(); - }); + if (isEventLoopActive() && expireStaleConnections()) { + forEachClusterConnection(input -> { + ClusterDistributionChannelWriter writer = (ClusterDistributionChannelWriter) input + .getChannelWriter(); + writer.getClusterConnectionProvider().closeStaleConnections(); + }); + } } } + + boolean expireStaleConnections() { + return getClusterClientOptions() == null || getClusterClientOptions().isCloseStaleConnections(); + } } diff --git a/src/test/java/com/lambdaworks/redis/cluster/AdvancedClusterClientTest.java b/src/test/java/com/lambdaworks/redis/cluster/AdvancedClusterClientTest.java index 24d27485dc..55e0563f72 100644 --- a/src/test/java/com/lambdaworks/redis/cluster/AdvancedClusterClientTest.java +++ b/src/test/java/com/lambdaworks/redis/cluster/AdvancedClusterClientTest.java @@ -11,6 +11,7 @@ import java.util.concurrent.CompletionStage; import java.util.concurrent.TimeUnit; +import com.lambdaworks.redis.TestSettings; import com.lambdaworks.redis.api.StatefulRedisConnection; import com.lambdaworks.redis.api.sync.RedisCommands; import com.lambdaworks.redis.cluster.api.StatefulRedisClusterConnection; @@ -483,17 +484,26 @@ public void forbiddenHostOnRedirect() throws Exception { } @Test - public void getConnectionToNotAClusterMember() throws Exception { + public void getConnectionToNotAClusterMemberForbidden() throws Exception { RedisAdvancedClusterConnection sync = clusterClient.connectCluster(); try { - sync.getConnection("8.8.8.8", 1234); + sync.getConnection(TestSettings.host(), TestSettings.port()); } catch (RedisException e) { assertThat(e).hasRootCauseExactlyInstanceOf(IllegalArgumentException.class); } sync.close(); } + @Test + public void getConnectionToNotAClusterMemberAllowed() throws Exception { + + clusterClient.setOptions(new ClusterClientOptions.Builder().validateClusterNodeMembership(false).build()); + RedisAdvancedClusterConnection sync = clusterClient.connectCluster(); + sync.getConnection(TestSettings.host(), TestSettings.port()); + sync.close(); + } + @Test public void pipelining() throws Exception { diff --git a/src/test/java/com/lambdaworks/redis/cluster/RedisClusterSetupTest.java b/src/test/java/com/lambdaworks/redis/cluster/RedisClusterSetupTest.java index 3231695523..3dac0a3744 100644 --- a/src/test/java/com/lambdaworks/redis/cluster/RedisClusterSetupTest.java +++ b/src/test/java/com/lambdaworks/redis/cluster/RedisClusterSetupTest.java @@ -320,12 +320,44 @@ protected void shiftAllSlotsToNode1() throws InterruptedException, TimeoutExcept waitForSlots(redis2, 0); - redis1.clusterAddSlots(AbstractClusterTest.createSlots(12000, 16384)); + final RedisClusterNode redis2Partition = getOwnPartition(redis2); + WaitFor.waitOrTimeout(new Condition() { + @Override + public boolean isSatisfied() { + Partitions partitions = ClusterPartitionParser.parse(redis1.clusterNodes()); + RedisClusterNode partition = partitions.getPartitionByNodeId(redis2Partition.getNodeId()); + + if (!partition.getSlots().isEmpty()) { + removeRemaining(partition); + } + + return partition.getSlots().size() == 0; + } + + private void removeRemaining(RedisClusterNode partition) { + try { + int[] ints = toIntArray(partition.getSlots()); + redis1.clusterDelSlots(ints); + } catch (Exception e) { + + } + } + }, timeout(seconds(10))); + + redis1.clusterAddSlots(RedisClusterClientTest.createSlots(12000, 16384)); waitForSlots(redis1, 16384); Wait.untilTrue(clusterRule::isStable).waitOrTimeout(); } + private int[] toIntArray(List source) { + int[] result = new int[source.size()]; + for (int i = 0; i < source.size(); i++) { + result[i] = source.get(i); + } + return result; + } + @Test public void expireStaleNodeIdConnections() throws Exception { @@ -369,6 +401,43 @@ private void assertRoutedExecution(RedisClusterAsyncCommands clu assertExecuted(clusterConnection.set("p", "value")); // 16023 } + @Test + public void doNotExpireStaleNodeIdConnections() throws Exception { + + clusterClient.setOptions(new ClusterClientOptions.Builder().refreshClusterView(true).closeStaleConnections(false) + .refreshPeriod(1, TimeUnit.SECONDS).build()); + RedisAdvancedClusterAsyncCommands clusterConnection = clusterClient.connectClusterAsync(); + + ClusterSetup.setup2Masters(clusterRule); + + PooledClusterConnectionProvider clusterConnectionProvider = getPooledClusterConnectionProvider(clusterConnection); + + assertThat(clusterConnectionProvider.getConnectionCount()).isEqualTo(0); + + assertRoutedExecution(clusterConnection); + + assertThat(clusterConnectionProvider.getConnectionCount()).isEqualTo(2); + + Partitions partitions = ClusterPartitionParser.parse(redis1.clusterNodes()); + for (RedisClusterNode redisClusterNode : partitions.getPartitions()) { + if (!redisClusterNode.getFlags().contains(RedisClusterNode.NodeFlag.MYSELF)) { + redis1.clusterForget(redisClusterNode.getNodeId()); + } + } + + partitions = ClusterPartitionParser.parse(redis2.clusterNodes()); + for (RedisClusterNode redisClusterNode : partitions.getPartitions()) { + if (!redisClusterNode.getFlags().contains(RedisClusterNode.NodeFlag.MYSELF)) { + redis2.clusterForget(redisClusterNode.getNodeId()); + } + } + + Thread.sleep(2000); + + assertThat(clusterConnectionProvider.getConnectionCount()).isEqualTo(2); + + } + @Test public void expireStaleHostAndPortConnections() throws Exception {