diff --git a/src/main/java/com/lambdaworks/redis/cluster/ClusterClientOptions.java b/src/main/java/com/lambdaworks/redis/cluster/ClusterClientOptions.java
index f314b5b75d..b4f0914ab4 100644
--- a/src/main/java/com/lambdaworks/redis/cluster/ClusterClientOptions.java
+++ b/src/main/java/com/lambdaworks/redis/cluster/ClusterClientOptions.java
@@ -13,9 +13,11 @@ public class ClusterClientOptions extends ClientOptions {
private final boolean refreshClusterView;
private final long refreshPeriod;
private final TimeUnit refreshPeriodUnit;
+ private final boolean closeStaleConnections;
+ private final boolean validateClusterNodeMembership;
/**
- * Create a copy of {@literal options}
+ * Create a copy of {@literal options}.
*
* @param options the original
* @return A new instance of {@link ClusterClientOptions} containing the values of {@literal options}
@@ -32,6 +34,8 @@ public static class Builder extends ClientOptions.Builder {
private boolean refreshClusterView = false;
private long refreshPeriod = 60;
private TimeUnit refreshPeriodUnit = TimeUnit.SECONDS;
+ private boolean closeStaleConnections = true;
+ private boolean validateClusterNodeMembership = true;
/**
* Enable regular cluster topology updates. The client starts updating the cluster topology in the intervals of
@@ -47,7 +51,7 @@ public Builder refreshClusterView(boolean refreshClusterView) {
}
/**
- * Set the refresh period. Defaults to {@literal 60 SECONDS}
+ * Set the refresh period. Defaults to {@literal 60 SECONDS}.
*
* @param refreshPeriod period for triggering topology updates
* @param refreshPeriodUnit unit for {@code refreshPeriod}
@@ -59,6 +63,29 @@ public Builder refreshPeriod(long refreshPeriod, TimeUnit refreshPeriodUnit) {
return this;
}
+ /**
+ * Flag, whether to close stale connections when refreshing the cluster topology. Defaults to {@literal true}. Comes
+ * only into effect if {@link #isRefreshClusterView()} is {@literal true}.
+ *
+ * @param closeStaleConnections {@literal true} if stale connections are cleaned up after cluster topology updates
+ * @return {@code this}
+ */
+ public Builder closeStaleConnections(boolean closeStaleConnections) {
+ this.closeStaleConnections = closeStaleConnections;
+ return this;
+ }
+
+ /**
+ * Validate the cluster node membership before allowing connections to a cluster node. Defaults to {@literal true}.
+ *
+ * @param validateClusterNodeMembership {@literal true} if validation is enabled.
+ * @return {@code this}
+ */
+ public Builder validateClusterNodeMembership(boolean validateClusterNodeMembership) {
+ this.validateClusterNodeMembership = validateClusterNodeMembership;
+ return this;
+ }
+
/**
* Create a new instance of {@link ClusterClientOptions}
*
@@ -74,6 +101,8 @@ protected ClusterClientOptions(Builder builder) {
this.refreshClusterView = builder.refreshClusterView;
this.refreshPeriod = builder.refreshPeriod;
this.refreshPeriodUnit = builder.refreshPeriodUnit;
+ this.closeStaleConnections = builder.closeStaleConnections;
+ this.validateClusterNodeMembership = builder.validateClusterNodeMembership;
}
protected ClusterClientOptions(ClusterClientOptions original) {
@@ -81,37 +110,62 @@ protected ClusterClientOptions(ClusterClientOptions original) {
this.refreshClusterView = original.refreshClusterView;
this.refreshPeriod = original.refreshPeriod;
this.refreshPeriodUnit = original.refreshPeriodUnit;
+ this.closeStaleConnections = original.closeStaleConnections;
+ this.validateClusterNodeMembership = original.validateClusterNodeMembership;
}
protected ClusterClientOptions() {
this.refreshClusterView = false;
this.refreshPeriod = 60;
this.refreshPeriodUnit = TimeUnit.SECONDS;
+ this.closeStaleConnections = true;
+ this.validateClusterNodeMembership = true;
}
/**
* Flag, whether regular cluster topology updates are updated. The client starts updating the cluster topology in the
- * intervals of {@link #getRefreshPeriod()} /{@link #getRefreshPeriodUnit()}. Defaults to {@literal false}
+ * intervals of {@link #getRefreshPeriod()} /{@link #getRefreshPeriodUnit()}. Defaults to {@literal false}.
*
- * @return
+ * @return {@literal true} it the cluster topology view is updated periodically
*/
public boolean isRefreshClusterView() {
return refreshClusterView;
}
/**
- *
- * @return the period between the regular cluster topology updates.
+ * Period between the regular cluster topology updates. Defaults to {@literal 60}.
+ *
+ * @return the period between the regular cluster topology updates
*/
public long getRefreshPeriod() {
return refreshPeriod;
}
/**
- *
+ * Unit for the {@link #getRefreshPeriod()}. Defaults to {@link TimeUnit#SECONDS}.
+ *
* @return unit for the {@link #getRefreshPeriod()}
*/
public TimeUnit getRefreshPeriodUnit() {
return refreshPeriodUnit;
}
+
+ /**
+ * Flag, whether to close stale connections when refreshing the cluster topology. Defaults to {@literal true}. Comes only
+ * into effect if {@link #isRefreshClusterView()} is {@literal true}.
+ *
+ * @return {@literal true} if stale connections are cleaned up after cluster topology updates
+ */
+ public boolean isCloseStaleConnections() {
+ return closeStaleConnections;
+ }
+
+ /**
+ * Validate the cluster node membership before allowing connections to a cluster node. Defaults to {@literal true}.
+ *
+ * @return {@literal true} if validation is enabled.
+ */
+ public boolean isValidateClusterNodeMembership() {
+ return validateClusterNodeMembership;
+ }
}
diff --git a/src/main/java/com/lambdaworks/redis/cluster/PooledClusterConnectionProvider.java b/src/main/java/com/lambdaworks/redis/cluster/PooledClusterConnectionProvider.java
index 762fa94758..a418887575 100644
--- a/src/main/java/com/lambdaworks/redis/cluster/PooledClusterConnectionProvider.java
+++ b/src/main/java/com/lambdaworks/redis/cluster/PooledClusterConnectionProvider.java
@@ -35,6 +35,7 @@
* @author Mark Paluch
* @since 3.0
*/
+@SuppressWarnings({ "unchecked", "rawtypes" })
class PooledClusterConnectionProvider implements ClusterConnectionProvider {
private static final InternalLogger logger = InternalLoggerFactory.getInstance(PooledClusterConnectionProvider.class);
@@ -42,6 +43,7 @@ class PooledClusterConnectionProvider implements ClusterConnectionProvider
private final LoadingCache> connections;
private final boolean debugEnabled;
private final StatefulRedisConnection writers[] = new StatefulRedisConnection[SlotHash.SLOT_COUNT];
+ private final RedisClusterClient redisClusterClient;
private Partitions partitions;
private boolean autoFlushCommands = true;
@@ -49,6 +51,7 @@ class PooledClusterConnectionProvider implements ClusterConnectionProvider
public PooledClusterConnectionProvider(RedisClusterClient redisClusterClient, RedisChannelWriter clusterWriter,
RedisCodec redisCodec) {
+ this.redisClusterClient = redisClusterClient;
this.debugEnabled = logger.isDebugEnabled();
this.connections = CacheBuilder.newBuilder().build(
new ConnectionFactory(redisClusterClient, redisCodec, clusterWriter));
@@ -110,11 +113,13 @@ public StatefulRedisConnection getConnection(Intent intent, String host, i
logger.debug("getConnection(" + intent + ", " + host + ", " + port + ")");
}
- RedisClusterNode redisClusterNode = getPartition(host, port);
+ if (validateClusterNodeMembership()) {
+ RedisClusterNode redisClusterNode = getPartition(host, port);
- if (redisClusterNode == null) {
- HostAndPort hostAndPort = HostAndPort.fromParts(host, port);
- throw invalidConnectionPoint(hostAndPort.toString());
+ if (redisClusterNode == null) {
+ HostAndPort hostAndPort = HostAndPort.fromParts(host, port);
+ throw invalidConnectionPoint(hostAndPort.toString());
+ }
}
ConnectionKey key = new ConnectionKey(intent, host, port);
@@ -186,10 +191,8 @@ private void reconfigurePartitions() {
resetWriterCache();
- for (ConnectionKey key : staleConnections) {
- StatefulRedisConnection connection = connections.getIfPresent(key);
- connection.close();
- connections.invalidate(key);
+ if (redisClusterClient.expireStaleConnections()) {
+ closeStaleConnections();
}
}
@@ -255,7 +258,6 @@ public void setAutoFlushCommands(boolean autoFlush) {
@Override
public void flushCommands() {
-
for (StatefulRedisConnection connection : connections.asMap().values()) {
connection.flushCommands();
}
@@ -359,6 +361,11 @@ public int hashCode() {
}
}
+ private boolean validateClusterNodeMembership() {
+ return redisClusterClient.getClusterClientOptions() == null
+ || redisClusterClient.getClusterClientOptions().isValidateClusterNodeMembership();
+ }
+
private class ConnectionFactory extends CacheLoader> {
private final RedisClusterClient redisClusterClient;
@@ -387,8 +394,11 @@ public StatefulRedisConnection load(ConnectionKey key) throws Exception {
}
if (key.host != null) {
- if (getPartition(key.host, key.port) == null) {
- throw invalidConnectionPoint(key.host + ":" + key.port);
+
+ if (validateClusterNodeMembership()) {
+ if (getPartition(key.host, key.port) == null) {
+ throw invalidConnectionPoint(key.host + ":" + key.port);
+ }
}
// Host and port connections do not provide command recovery due to cluster reconfiguration
diff --git a/src/main/java/com/lambdaworks/redis/cluster/RedisClusterClient.java b/src/main/java/com/lambdaworks/redis/cluster/RedisClusterClient.java
index 03fafa8ed9..95b768f81a 100644
--- a/src/main/java/com/lambdaworks/redis/cluster/RedisClusterClient.java
+++ b/src/main/java/com/lambdaworks/redis/cluster/RedisClusterClient.java
@@ -122,8 +122,8 @@ public RedisClusterClient(List initialUris) {
this.initialUris = initialUris;
checkNotNull(initialUris, "initialUris must not be null");
checkArgument(!initialUris.isEmpty(), "initialUris must not be empty");
-
setDefaultTimeout(getFirstUri().getTimeout(), getFirstUri().getUnit());
+ setOptions(new ClusterClientOptions.Builder().build());
}
/**
@@ -442,13 +442,20 @@ protected void forEachCloseable(Predicate super Closeabl
/**
* Set the {@link ClusterClientOptions} for the client.
- *
- * @param clientOptions
+ *
+ * @param clientOptions client options for the client and connections that are created after setting the options
*/
public void setOptions(ClusterClientOptions clientOptions) {
super.setOptions(clientOptions);
}
+ ClusterClientOptions getClusterClientOptions() {
+ if (getOptions() instanceof ClusterClientOptions) {
+ return (ClusterClientOptions) getOptions();
+ }
+ return null;
+ }
+
private class ClusterTopologyRefreshTask implements Runnable {
public ClusterTopologyRefreshTask() {
@@ -457,9 +464,8 @@ public ClusterTopologyRefreshTask() {
@Override
public void run() {
logger.debug("ClusterTopologyRefreshTask.run()");
- if (isEventLoopActive() && getOptions() instanceof ClusterClientOptions) {
- ClusterClientOptions options = (ClusterClientOptions) getOptions();
- if (!options.isRefreshClusterView()) {
+ if (isEventLoopActive() && getClusterClientOptions() != null) {
+ if (!getClusterClientOptions().isRefreshClusterView()) {
logger.debug("ClusterTopologyRefreshTask is disabled");
return;
}
@@ -486,7 +492,7 @@ public void run() {
getPartitions().reload(values.get(0).getPartitions());
updatePartitionsInConnections();
- if (isEventLoopActive()) {
+ if (isEventLoopActive() && expireStaleConnections()) {
genericWorkerPool.submit(new CloseStaleConnectionsTask());
}
@@ -497,12 +503,18 @@ public void run() {
private class CloseStaleConnectionsTask implements Runnable {
@Override
public void run() {
- forEachClusterConnection(input -> {
- ClusterDistributionChannelWriter, ?> writer = (ClusterDistributionChannelWriter, ?>) input
- .getChannelWriter();
- writer.getClusterConnectionProvider().closeStaleConnections();
- });
+ if (isEventLoopActive() && expireStaleConnections()) {
+ forEachClusterConnection(input -> {
+ ClusterDistributionChannelWriter, ?> writer = (ClusterDistributionChannelWriter, ?>) input
+ .getChannelWriter();
+ writer.getClusterConnectionProvider().closeStaleConnections();
+ });
+ }
}
}
+
+ boolean expireStaleConnections() {
+ return getClusterClientOptions() == null || getClusterClientOptions().isCloseStaleConnections();
+ }
}
diff --git a/src/test/java/com/lambdaworks/redis/cluster/AdvancedClusterClientTest.java b/src/test/java/com/lambdaworks/redis/cluster/AdvancedClusterClientTest.java
index 24d27485dc..55e0563f72 100644
--- a/src/test/java/com/lambdaworks/redis/cluster/AdvancedClusterClientTest.java
+++ b/src/test/java/com/lambdaworks/redis/cluster/AdvancedClusterClientTest.java
@@ -11,6 +11,7 @@
import java.util.concurrent.CompletionStage;
import java.util.concurrent.TimeUnit;
+import com.lambdaworks.redis.TestSettings;
import com.lambdaworks.redis.api.StatefulRedisConnection;
import com.lambdaworks.redis.api.sync.RedisCommands;
import com.lambdaworks.redis.cluster.api.StatefulRedisClusterConnection;
@@ -483,17 +484,26 @@ public void forbiddenHostOnRedirect() throws Exception {
}
@Test
- public void getConnectionToNotAClusterMember() throws Exception {
+ public void getConnectionToNotAClusterMemberForbidden() throws Exception {
RedisAdvancedClusterConnection sync = clusterClient.connectCluster();
try {
- sync.getConnection("8.8.8.8", 1234);
+ sync.getConnection(TestSettings.host(), TestSettings.port());
} catch (RedisException e) {
assertThat(e).hasRootCauseExactlyInstanceOf(IllegalArgumentException.class);
}
sync.close();
}
+ @Test
+ public void getConnectionToNotAClusterMemberAllowed() throws Exception {
+
+ clusterClient.setOptions(new ClusterClientOptions.Builder().validateClusterNodeMembership(false).build());
+ RedisAdvancedClusterConnection sync = clusterClient.connectCluster();
+ sync.getConnection(TestSettings.host(), TestSettings.port());
+ sync.close();
+ }
+
@Test
public void pipelining() throws Exception {
diff --git a/src/test/java/com/lambdaworks/redis/cluster/RedisClusterSetupTest.java b/src/test/java/com/lambdaworks/redis/cluster/RedisClusterSetupTest.java
index 3231695523..3dac0a3744 100644
--- a/src/test/java/com/lambdaworks/redis/cluster/RedisClusterSetupTest.java
+++ b/src/test/java/com/lambdaworks/redis/cluster/RedisClusterSetupTest.java
@@ -320,12 +320,44 @@ protected void shiftAllSlotsToNode1() throws InterruptedException, TimeoutExcept
waitForSlots(redis2, 0);
- redis1.clusterAddSlots(AbstractClusterTest.createSlots(12000, 16384));
+ final RedisClusterNode redis2Partition = getOwnPartition(redis2);
+ WaitFor.waitOrTimeout(new Condition() {
+ @Override
+ public boolean isSatisfied() {
+ Partitions partitions = ClusterPartitionParser.parse(redis1.clusterNodes());
+ RedisClusterNode partition = partitions.getPartitionByNodeId(redis2Partition.getNodeId());
+
+ if (!partition.getSlots().isEmpty()) {
+ removeRemaining(partition);
+ }
+
+ return partition.getSlots().size() == 0;
+ }
+
+ private void removeRemaining(RedisClusterNode partition) {
+ try {
+ int[] ints = toIntArray(partition.getSlots());
+ redis1.clusterDelSlots(ints);
+ } catch (Exception e) {
+
+ }
+ }
+ }, timeout(seconds(10)));
+
+ redis1.clusterAddSlots(RedisClusterClientTest.createSlots(12000, 16384));
waitForSlots(redis1, 16384);
Wait.untilTrue(clusterRule::isStable).waitOrTimeout();
}
+ private int[] toIntArray(List source) {
+ int[] result = new int[source.size()];
+ for (int i = 0; i < source.size(); i++) {
+ result[i] = source.get(i);
+ }
+ return result;
+ }
+
@Test
public void expireStaleNodeIdConnections() throws Exception {
@@ -369,6 +401,43 @@ private void assertRoutedExecution(RedisClusterAsyncCommands clu
assertExecuted(clusterConnection.set("p", "value")); // 16023
}
+ @Test
+ public void doNotExpireStaleNodeIdConnections() throws Exception {
+
+ clusterClient.setOptions(new ClusterClientOptions.Builder().refreshClusterView(true).closeStaleConnections(false)
+ .refreshPeriod(1, TimeUnit.SECONDS).build());
+ RedisAdvancedClusterAsyncCommands clusterConnection = clusterClient.connectClusterAsync();
+
+ ClusterSetup.setup2Masters(clusterRule);
+
+ PooledClusterConnectionProvider, ?> clusterConnectionProvider = getPooledClusterConnectionProvider(clusterConnection);
+
+ assertThat(clusterConnectionProvider.getConnectionCount()).isEqualTo(0);
+
+ assertRoutedExecution(clusterConnection);
+
+ assertThat(clusterConnectionProvider.getConnectionCount()).isEqualTo(2);
+
+ Partitions partitions = ClusterPartitionParser.parse(redis1.clusterNodes());
+ for (RedisClusterNode redisClusterNode : partitions.getPartitions()) {
+ if (!redisClusterNode.getFlags().contains(RedisClusterNode.NodeFlag.MYSELF)) {
+ redis1.clusterForget(redisClusterNode.getNodeId());
+ }
+ }
+
+ partitions = ClusterPartitionParser.parse(redis2.clusterNodes());
+ for (RedisClusterNode redisClusterNode : partitions.getPartitions()) {
+ if (!redisClusterNode.getFlags().contains(RedisClusterNode.NodeFlag.MYSELF)) {
+ redis2.clusterForget(redisClusterNode.getNodeId());
+ }
+ }
+
+ Thread.sleep(2000);
+
+ assertThat(clusterConnectionProvider.getConnectionCount()).isEqualTo(2);
+
+ }
+
@Test
public void expireStaleHostAndPortConnections() throws Exception {