diff --git a/src/main/java/com/lambdaworks/redis/cluster/ClusterClientOptions.java b/src/main/java/com/lambdaworks/redis/cluster/ClusterClientOptions.java index f314b5b75d..b4f0914ab4 100644 --- a/src/main/java/com/lambdaworks/redis/cluster/ClusterClientOptions.java +++ b/src/main/java/com/lambdaworks/redis/cluster/ClusterClientOptions.java @@ -13,9 +13,11 @@ public class ClusterClientOptions extends ClientOptions { private final boolean refreshClusterView; private final long refreshPeriod; private final TimeUnit refreshPeriodUnit; + private final boolean closeStaleConnections; + private final boolean validateClusterNodeMembership; /** - * Create a copy of {@literal options} + * Create a copy of {@literal options}. * * @param options the original * @return A new instance of {@link ClusterClientOptions} containing the values of {@literal options} @@ -32,6 +34,8 @@ public static class Builder extends ClientOptions.Builder { private boolean refreshClusterView = false; private long refreshPeriod = 60; private TimeUnit refreshPeriodUnit = TimeUnit.SECONDS; + private boolean closeStaleConnections = true; + private boolean validateClusterNodeMembership = true; /** * Enable regular cluster topology updates. The client starts updating the cluster topology in the intervals of @@ -47,7 +51,7 @@ public Builder refreshClusterView(boolean refreshClusterView) { } /** - * Set the refresh period. Defaults to {@literal 60 SECONDS} + * Set the refresh period. Defaults to {@literal 60 SECONDS}. * * @param refreshPeriod period for triggering topology updates * @param refreshPeriodUnit unit for {@code refreshPeriod} @@ -59,6 +63,29 @@ public Builder refreshPeriod(long refreshPeriod, TimeUnit refreshPeriodUnit) { return this; } + /** + * Flag, whether to close stale connections when refreshing the cluster topology. Defaults to {@literal true}. Comes + * only into effect if {@link #isRefreshClusterView()} is {@literal true}. + * + * @param closeStaleConnections {@literal true} if stale connections are cleaned up after cluster topology updates + * @return {@code this} + */ + public Builder closeStaleConnections(boolean closeStaleConnections) { + this.closeStaleConnections = closeStaleConnections; + return this; + } + + /** + * Validate the cluster node membership before allowing connections to a cluster node. Defaults to {@literal true}. + * + * @param validateClusterNodeMembership {@literal true} if validation is enabled. + * @return {@code this} + */ + public Builder validateClusterNodeMembership(boolean validateClusterNodeMembership) { + this.validateClusterNodeMembership = validateClusterNodeMembership; + return this; + } + /** * Create a new instance of {@link ClusterClientOptions} * @@ -74,6 +101,8 @@ protected ClusterClientOptions(Builder builder) { this.refreshClusterView = builder.refreshClusterView; this.refreshPeriod = builder.refreshPeriod; this.refreshPeriodUnit = builder.refreshPeriodUnit; + this.closeStaleConnections = builder.closeStaleConnections; + this.validateClusterNodeMembership = builder.validateClusterNodeMembership; } protected ClusterClientOptions(ClusterClientOptions original) { @@ -81,37 +110,62 @@ protected ClusterClientOptions(ClusterClientOptions original) { this.refreshClusterView = original.refreshClusterView; this.refreshPeriod = original.refreshPeriod; this.refreshPeriodUnit = original.refreshPeriodUnit; + this.closeStaleConnections = original.closeStaleConnections; + this.validateClusterNodeMembership = original.validateClusterNodeMembership; } protected ClusterClientOptions() { this.refreshClusterView = false; this.refreshPeriod = 60; this.refreshPeriodUnit = TimeUnit.SECONDS; + this.closeStaleConnections = true; + this.validateClusterNodeMembership = true; } /** * Flag, whether regular cluster topology updates are updated. The client starts updating the cluster topology in the - * intervals of {@link #getRefreshPeriod()} /{@link #getRefreshPeriodUnit()}. Defaults to {@literal false} + * intervals of {@link #getRefreshPeriod()} /{@link #getRefreshPeriodUnit()}. Defaults to {@literal false}. * - * @return + * @return {@literal true} it the cluster topology view is updated periodically */ public boolean isRefreshClusterView() { return refreshClusterView; } /** - * - * @return the period between the regular cluster topology updates. + * Period between the regular cluster topology updates. Defaults to {@literal 60}. + * + * @return the period between the regular cluster topology updates */ public long getRefreshPeriod() { return refreshPeriod; } /** - * + * Unit for the {@link #getRefreshPeriod()}. Defaults to {@link TimeUnit#SECONDS}. + * * @return unit for the {@link #getRefreshPeriod()} */ public TimeUnit getRefreshPeriodUnit() { return refreshPeriodUnit; } + + /** + * Flag, whether to close stale connections when refreshing the cluster topology. Defaults to {@literal true}. Comes only + * into effect if {@link #isRefreshClusterView()} is {@literal true}. + * + * @return {@literal true} if stale connections are cleaned up after cluster topology updates + */ + public boolean isCloseStaleConnections() { + return closeStaleConnections; + } + + /** + * Validate the cluster node membership before allowing connections to a cluster node. Defaults to {@literal true}. + * + * @return {@literal true} if validation is enabled. + */ + public boolean isValidateClusterNodeMembership() { + return validateClusterNodeMembership; + } } diff --git a/src/main/java/com/lambdaworks/redis/cluster/PooledClusterConnectionProvider.java b/src/main/java/com/lambdaworks/redis/cluster/PooledClusterConnectionProvider.java index 3925e98538..d6bc6ad4d5 100644 --- a/src/main/java/com/lambdaworks/redis/cluster/PooledClusterConnectionProvider.java +++ b/src/main/java/com/lambdaworks/redis/cluster/PooledClusterConnectionProvider.java @@ -15,11 +15,7 @@ import com.google.common.collect.Sets; import com.google.common.net.HostAndPort; import com.google.common.util.concurrent.UncheckedExecutionException; -import com.lambdaworks.redis.RedisAsyncConnection; -import com.lambdaworks.redis.RedisAsyncConnectionImpl; -import com.lambdaworks.redis.RedisChannelWriter; -import com.lambdaworks.redis.RedisException; -import com.lambdaworks.redis.RedisURI; +import com.lambdaworks.redis.*; import com.lambdaworks.redis.cluster.models.partitions.Partitions; import com.lambdaworks.redis.cluster.models.partitions.RedisClusterNode; import com.lambdaworks.redis.codec.RedisCodec; @@ -43,6 +39,7 @@ class PooledClusterConnectionProvider implements ClusterConnectionProvider private final LoadingCache> connections; private final boolean debugEnabled; private final RedisAsyncConnectionImpl writers[] = new RedisAsyncConnectionImpl[SlotHash.SLOT_COUNT]; + private final RedisClusterClient redisClusterClient; private Partitions partitions; private boolean autoFlushCommands = true; @@ -50,6 +47,7 @@ class PooledClusterConnectionProvider implements ClusterConnectionProvider public PooledClusterConnectionProvider(RedisClusterClient redisClusterClient, RedisChannelWriter clusterWriter, RedisCodec redisCodec) { + this.redisClusterClient = redisClusterClient; this.debugEnabled = logger.isDebugEnabled(); this.connections = CacheBuilder.newBuilder().build( new ConnectionFactory(redisClusterClient, redisCodec, clusterWriter)); @@ -108,11 +106,13 @@ public RedisAsyncConnectionImpl getConnection(Intent intent, String host, logger.debug("getConnection(" + intent + ", " + host + ", " + port + ")"); } - RedisClusterNode redisClusterNode = getPartition(host, port); + if (validateClusterNodeMembership()) { + RedisClusterNode redisClusterNode = getPartition(host, port); - if (redisClusterNode == null) { - HostAndPort hostAndPort = HostAndPort.fromParts(host, port); - throw invalidConnectionPoint(hostAndPort.toString()); + if (redisClusterNode == null) { + HostAndPort hostAndPort = HostAndPort.fromParts(host, port); + throw invalidConnectionPoint(hostAndPort.toString()); + } } ConnectionKey key = new ConnectionKey(intent, host, port); @@ -181,10 +181,8 @@ private void reconfigurePartitions() { resetWriterCache(); - for (ConnectionKey key : staleConnections) { - RedisAsyncConnectionImpl connection = connections.getIfPresent(key); - connection.close(); - connections.invalidate(key); + if (redisClusterClient.expireStaleConnections()) { + closeStaleConnections(); } } @@ -352,6 +350,11 @@ public int hashCode() { } } + private boolean validateClusterNodeMembership() { + return redisClusterClient.getClusterClientOptions() == null + || redisClusterClient.getClusterClientOptions().isValidateClusterNodeMembership(); + } + private class ConnectionFactory extends CacheLoader> { private final RedisClusterClient redisClusterClient; @@ -380,8 +383,11 @@ public RedisAsyncConnectionImpl load(ConnectionKey key) throws Exception { } if (key.host != null) { - if (getPartition(key.host, key.port) == null) { - throw invalidConnectionPoint(key.host + ":" + key.port); + + if (validateClusterNodeMembership()) { + if (getPartition(key.host, key.port) == null) { + throw invalidConnectionPoint(key.host + ":" + key.port); + } } // Host and port connections do not provide command recovery due to cluster reconfiguration @@ -394,5 +400,6 @@ public RedisAsyncConnectionImpl load(ConnectionKey key) throws Exception { } return connection; } + } } diff --git a/src/main/java/com/lambdaworks/redis/cluster/RedisClusterClient.java b/src/main/java/com/lambdaworks/redis/cluster/RedisClusterClient.java index fe4f7e374f..9951b1f923 100644 --- a/src/main/java/com/lambdaworks/redis/cluster/RedisClusterClient.java +++ b/src/main/java/com/lambdaworks/redis/cluster/RedisClusterClient.java @@ -1,30 +1,18 @@ package com.lambdaworks.redis.cluster; -import static com.google.common.base.Preconditions.checkArgument; -import static com.google.common.base.Preconditions.checkNotNull; -import static com.google.common.base.Preconditions.checkState; +import static com.google.common.base.Preconditions.*; import static com.lambdaworks.redis.cluster.ClusterTopologyRefresh.RedisUriComparator.INSTANCE; import java.io.Closeable; import java.net.SocketAddress; -import java.util.ArrayDeque; -import java.util.Collections; -import java.util.Comparator; -import java.util.List; -import java.util.Map; -import java.util.Queue; +import java.util.*; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import com.google.common.base.Predicate; import com.google.common.base.Supplier; import com.google.common.collect.Lists; -import com.lambdaworks.redis.AbstractRedisClient; -import com.lambdaworks.redis.RedisAsyncConnectionImpl; -import com.lambdaworks.redis.RedisChannelWriter; -import com.lambdaworks.redis.RedisClusterConnection; -import com.lambdaworks.redis.RedisException; -import com.lambdaworks.redis.RedisURI; +import com.lambdaworks.redis.*; import com.lambdaworks.redis.cluster.models.partitions.Partitions; import com.lambdaworks.redis.cluster.models.partitions.RedisClusterNode; import com.lambdaworks.redis.codec.RedisCodec; @@ -420,12 +408,19 @@ protected void forEachCloseable(Predicate>() { - @Override - public boolean apply(RedisAdvancedClusterAsyncConnectionImpl input) { - - ClusterDistributionChannelWriter writer = (ClusterDistributionChannelWriter) input - .getChannelWriter(); - writer.getClusterConnectionProvider().closeStaleConnections(); - return true; - } - }); + if (isEventLoopActive() && expireStaleConnections()) { + forEachClusterConnection(new Predicate>() { + @Override + public boolean apply(RedisAdvancedClusterAsyncConnectionImpl input) { + + ClusterDistributionChannelWriter writer = (ClusterDistributionChannelWriter) input + .getChannelWriter(); + writer.getClusterConnectionProvider().closeStaleConnections(); + return true; + } + }); + } } } + + boolean expireStaleConnections() { + return getClusterClientOptions() == null || getClusterClientOptions().isCloseStaleConnections(); + } } diff --git a/src/test/java/com/lambdaworks/redis/cluster/AdvancedClusterClientTest.java b/src/test/java/com/lambdaworks/redis/cluster/AdvancedClusterClientTest.java index 6a4fb001ad..2170892958 100644 --- a/src/test/java/com/lambdaworks/redis/cluster/AdvancedClusterClientTest.java +++ b/src/test/java/com/lambdaworks/redis/cluster/AdvancedClusterClientTest.java @@ -7,6 +7,7 @@ import java.util.List; import java.util.concurrent.TimeUnit; +import com.lambdaworks.redis.*; import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -17,12 +18,6 @@ import com.google.code.tempusfugit.temporal.WaitFor; import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; -import com.lambdaworks.redis.LettuceFutures; -import com.lambdaworks.redis.RedisClusterAsyncConnection; -import com.lambdaworks.redis.RedisClusterConnection; -import com.lambdaworks.redis.RedisException; -import com.lambdaworks.redis.RedisFuture; -import com.lambdaworks.redis.RedisURI; import com.lambdaworks.redis.cluster.models.partitions.Partitions; import com.lambdaworks.redis.cluster.models.partitions.RedisClusterNode; @@ -161,17 +156,27 @@ public void forbiddenHostOnRedirect() throws Exception { } @Test - public void getConnectionToNotAClusterMember() throws Exception { + public void getConnectionToNotAClusterMemberForbidden() throws Exception { RedisAdvancedClusterConnection sync = clusterClient.connectCluster(); try { - sync.getConnection("8.8.8.8", 1234); + sync.getConnection(TestSettings.host(), TestSettings.port()); } catch (RedisException e) { assertThat(e).hasRootCauseExactlyInstanceOf(IllegalArgumentException.class); } sync.close(); } + + @Test + public void getConnectionToNotAClusterMemberAllowed() throws Exception { + + clusterClient.setOptions(new ClusterClientOptions.Builder().validateClusterNodeMembership(false).build()); + RedisAdvancedClusterConnection sync = clusterClient.connectCluster(); + sync.getConnection(TestSettings.host(), TestSettings.port()); + sync.close(); + } + @Test public void pipelining() throws Exception { diff --git a/src/test/java/com/lambdaworks/redis/cluster/RedisClusterSetupTest.java b/src/test/java/com/lambdaworks/redis/cluster/RedisClusterSetupTest.java index 7c8e171dca..97771ee214 100644 --- a/src/test/java/com/lambdaworks/redis/cluster/RedisClusterSetupTest.java +++ b/src/test/java/com/lambdaworks/redis/cluster/RedisClusterSetupTest.java @@ -419,6 +419,43 @@ public boolean isSatisfied() { } + @Test + public void doNotExpireStaleNodeIdConnections() throws Exception { + + clusterClient.setOptions(new ClusterClientOptions.Builder().refreshClusterView(true).closeStaleConnections(false).refreshPeriod(1, TimeUnit.SECONDS) + .build()); + RedisAdvancedClusterAsyncConnection clusterConnection = clusterClient.connectClusterAsync(); + + setup2Masters(); + + PooledClusterConnectionProvider clusterConnectionProvider = getPooledClusterConnectionProvider(clusterConnection); + + assertThat(clusterConnectionProvider.getConnectionCount()).isEqualTo(0); + + assertRoutedExecution(clusterConnection); + + assertThat(clusterConnectionProvider.getConnectionCount()).isEqualTo(2); + + Partitions partitions = ClusterPartitionParser.parse(redis1.clusterNodes()); + for (RedisClusterNode redisClusterNode : partitions.getPartitions()) { + if (!redisClusterNode.getFlags().contains(RedisClusterNode.NodeFlag.MYSELF)) { + redis1.clusterForget(redisClusterNode.getNodeId()); + } + } + + partitions = ClusterPartitionParser.parse(redis2.clusterNodes()); + for (RedisClusterNode redisClusterNode : partitions.getPartitions()) { + if (!redisClusterNode.getFlags().contains(RedisClusterNode.NodeFlag.MYSELF)) { + redis2.clusterForget(redisClusterNode.getNodeId()); + } + } + + Thread.sleep(2000); + + assertThat(clusterConnectionProvider.getConnectionCount()).isEqualTo(2); + + } + @Test public void expireStaleHostAndPortConnections() throws Exception {