Skip to content

Commit

Permalink
Add close stale connections and strict cluster member check flags to …
Browse files Browse the repository at this point in the history
…ClusterClientOptions #109

Add two flags to ClusterClientOptions:

closeStaleConnections: Close stale connections when refreshing the cluster topology
Motivation: Connections to nodes, which do not belong to the cluster (anymore) are closed as soon as the cluster topology changes. If one node is no longer part of the cluster, the connections to the node can be closed. One might want to prevent that behavior because one might want still to communicate with the other nodes that come into play when using validateClusterNodeMembership = false
validateClusterNodeMembership: Validate the cluster node membership before allowing connections to that node Motivation: The current implementation performs redirects using MOVED and ASK and allows obtaining connections to the particular cluster nodes. The validation was introduced during the development of version 3.3 to prevent security breaches and only allow connections to the known hosts of the CLUSTER NODES output. There are some scenarios, where the strict validation is an obstruction:
MOVED/ASK redirection but the cluster topology view is stale
Connecting to cluster nodes using different IP's/hostnames (e.g. private/public IP's)
Connecting to non-cluster members to reconfigure those while using the RedisClusterClient connection.
  • Loading branch information
mp911de committed Jul 31, 2015
1 parent 6ac6a1c commit 0595ff6
Show file tree
Hide file tree
Showing 5 changed files with 163 additions and 60 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,11 @@ public class ClusterClientOptions extends ClientOptions {
private final boolean refreshClusterView;
private final long refreshPeriod;
private final TimeUnit refreshPeriodUnit;
private final boolean closeStaleConnections;
private final boolean validateClusterNodeMembership;

/**
* Create a copy of {@literal options}
* Create a copy of {@literal options}.
*
* @param options the original
* @return A new instance of {@link ClusterClientOptions} containing the values of {@literal options}
Expand All @@ -32,6 +34,8 @@ public static class Builder extends ClientOptions.Builder {
private boolean refreshClusterView = false;
private long refreshPeriod = 60;
private TimeUnit refreshPeriodUnit = TimeUnit.SECONDS;
private boolean closeStaleConnections = true;
private boolean validateClusterNodeMembership = true;

/**
* Enable regular cluster topology updates. The client starts updating the cluster topology in the intervals of
Expand All @@ -47,7 +51,7 @@ public Builder refreshClusterView(boolean refreshClusterView) {
}

/**
* Set the refresh period. Defaults to {@literal 60 SECONDS}
* Set the refresh period. Defaults to {@literal 60 SECONDS}.
*
* @param refreshPeriod period for triggering topology updates
* @param refreshPeriodUnit unit for {@code refreshPeriod}
Expand All @@ -59,6 +63,29 @@ public Builder refreshPeriod(long refreshPeriod, TimeUnit refreshPeriodUnit) {
return this;
}

/**
* Flag, whether to close stale connections when refreshing the cluster topology. Defaults to {@literal true}. Comes
* only into effect if {@link #isRefreshClusterView()} is {@literal true}.
*
* @param closeStaleConnections {@literal true} if stale connections are cleaned up after cluster topology updates
* @return {@code this}
*/
public Builder closeStaleConnections(boolean closeStaleConnections) {
this.closeStaleConnections = closeStaleConnections;
return this;
}

/**
* Validate the cluster node membership before allowing connections to a cluster node. Defaults to {@literal true}.
*
* @param validateClusterNodeMembership {@literal true} if validation is enabled.
* @return {@code this}
*/
public Builder validateClusterNodeMembership(boolean validateClusterNodeMembership) {
this.validateClusterNodeMembership = validateClusterNodeMembership;
return this;
}

/**
* Create a new instance of {@link ClusterClientOptions}
*
Expand All @@ -74,44 +101,71 @@ protected ClusterClientOptions(Builder builder) {
this.refreshClusterView = builder.refreshClusterView;
this.refreshPeriod = builder.refreshPeriod;
this.refreshPeriodUnit = builder.refreshPeriodUnit;
this.closeStaleConnections = builder.closeStaleConnections;
this.validateClusterNodeMembership = builder.validateClusterNodeMembership;
}

protected ClusterClientOptions(ClusterClientOptions original) {
super(original);
this.refreshClusterView = original.refreshClusterView;
this.refreshPeriod = original.refreshPeriod;
this.refreshPeriodUnit = original.refreshPeriodUnit;
this.closeStaleConnections = original.closeStaleConnections;
this.validateClusterNodeMembership = original.validateClusterNodeMembership;
}

protected ClusterClientOptions() {
this.refreshClusterView = false;
this.refreshPeriod = 60;
this.refreshPeriodUnit = TimeUnit.SECONDS;
this.closeStaleConnections = true;
this.validateClusterNodeMembership = true;
}

/**
* Flag, whether regular cluster topology updates are updated. The client starts updating the cluster topology in the
* intervals of {@link #getRefreshPeriod()} /{@link #getRefreshPeriodUnit()}. Defaults to {@literal false}
* intervals of {@link #getRefreshPeriod()} /{@link #getRefreshPeriodUnit()}. Defaults to {@literal false}.
*
* @return
* @return {@literal true} it the cluster topology view is updated periodically
*/
public boolean isRefreshClusterView() {
return refreshClusterView;
}

/**
*
* @return the period between the regular cluster topology updates.
* Period between the regular cluster topology updates. Defaults to {@literal 60}.
*
* @return the period between the regular cluster topology updates
*/
public long getRefreshPeriod() {
return refreshPeriod;
}

/**
*
* Unit for the {@link #getRefreshPeriod()}. Defaults to {@link TimeUnit#SECONDS}.
*
* @return unit for the {@link #getRefreshPeriod()}
*/
public TimeUnit getRefreshPeriodUnit() {
return refreshPeriodUnit;
}

/**
* Flag, whether to close stale connections when refreshing the cluster topology. Defaults to {@literal true}. Comes only
* into effect if {@link #isRefreshClusterView()} is {@literal true}.
*
* @return {@literal true} if stale connections are cleaned up after cluster topology updates
*/
public boolean isCloseStaleConnections() {
return closeStaleConnections;
}

/**
* Validate the cluster node membership before allowing connections to a cluster node. Defaults to {@literal true}.
*
* @return {@literal true} if validation is enabled.
*/
public boolean isValidateClusterNodeMembership() {
return validateClusterNodeMembership;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,7 @@
import com.google.common.collect.Sets;
import com.google.common.net.HostAndPort;
import com.google.common.util.concurrent.UncheckedExecutionException;
import com.lambdaworks.redis.RedisAsyncConnection;
import com.lambdaworks.redis.RedisAsyncConnectionImpl;
import com.lambdaworks.redis.RedisChannelWriter;
import com.lambdaworks.redis.RedisException;
import com.lambdaworks.redis.RedisURI;
import com.lambdaworks.redis.*;
import com.lambdaworks.redis.cluster.models.partitions.Partitions;
import com.lambdaworks.redis.cluster.models.partitions.RedisClusterNode;
import com.lambdaworks.redis.codec.RedisCodec;
Expand All @@ -43,13 +39,15 @@ class PooledClusterConnectionProvider<K, V> implements ClusterConnectionProvider
private final LoadingCache<ConnectionKey, RedisAsyncConnectionImpl<K, V>> connections;
private final boolean debugEnabled;
private final RedisAsyncConnectionImpl<K, V> writers[] = new RedisAsyncConnectionImpl[SlotHash.SLOT_COUNT];
private final RedisClusterClient redisClusterClient;
private Partitions partitions;

private boolean autoFlushCommands = true;
private Object stateLock = new Object();

public PooledClusterConnectionProvider(RedisClusterClient redisClusterClient, RedisChannelWriter<K, V> clusterWriter,
RedisCodec<K, V> redisCodec) {
this.redisClusterClient = redisClusterClient;
this.debugEnabled = logger.isDebugEnabled();
this.connections = CacheBuilder.newBuilder().build(
new ConnectionFactory<K, V>(redisClusterClient, redisCodec, clusterWriter));
Expand Down Expand Up @@ -108,11 +106,13 @@ public RedisAsyncConnectionImpl<K, V> getConnection(Intent intent, String host,
logger.debug("getConnection(" + intent + ", " + host + ", " + port + ")");
}

RedisClusterNode redisClusterNode = getPartition(host, port);
if (validateClusterNodeMembership()) {
RedisClusterNode redisClusterNode = getPartition(host, port);

if (redisClusterNode == null) {
HostAndPort hostAndPort = HostAndPort.fromParts(host, port);
throw invalidConnectionPoint(hostAndPort.toString());
if (redisClusterNode == null) {
HostAndPort hostAndPort = HostAndPort.fromParts(host, port);
throw invalidConnectionPoint(hostAndPort.toString());
}
}

ConnectionKey key = new ConnectionKey(intent, host, port);
Expand Down Expand Up @@ -181,10 +181,8 @@ private void reconfigurePartitions() {

resetWriterCache();

for (ConnectionKey key : staleConnections) {
RedisAsyncConnectionImpl<K, V> connection = connections.getIfPresent(key);
connection.close();
connections.invalidate(key);
if (redisClusterClient.expireStaleConnections()) {
closeStaleConnections();
}
}

Expand Down Expand Up @@ -352,6 +350,11 @@ public int hashCode() {
}
}

private boolean validateClusterNodeMembership() {
return redisClusterClient.getClusterClientOptions() == null
|| redisClusterClient.getClusterClientOptions().isValidateClusterNodeMembership();
}

private class ConnectionFactory<K, V> extends CacheLoader<ConnectionKey, RedisAsyncConnectionImpl<K, V>> {

private final RedisClusterClient redisClusterClient;
Expand Down Expand Up @@ -380,8 +383,11 @@ public RedisAsyncConnectionImpl<K, V> load(ConnectionKey key) throws Exception {
}

if (key.host != null) {
if (getPartition(key.host, key.port) == null) {
throw invalidConnectionPoint(key.host + ":" + key.port);

if (validateClusterNodeMembership()) {
if (getPartition(key.host, key.port) == null) {
throw invalidConnectionPoint(key.host + ":" + key.port);
}
}

// Host and port connections do not provide command recovery due to cluster reconfiguration
Expand All @@ -394,5 +400,6 @@ public RedisAsyncConnectionImpl<K, V> load(ConnectionKey key) throws Exception {
}
return connection;
}

}
}
60 changes: 30 additions & 30 deletions src/main/java/com/lambdaworks/redis/cluster/RedisClusterClient.java
Original file line number Diff line number Diff line change
@@ -1,30 +1,18 @@
package com.lambdaworks.redis.cluster;

import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Preconditions.checkState;
import static com.google.common.base.Preconditions.*;
import static com.lambdaworks.redis.cluster.ClusterTopologyRefresh.RedisUriComparator.INSTANCE;

import java.io.Closeable;
import java.net.SocketAddress;
import java.util.ArrayDeque;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.*;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

import com.google.common.base.Predicate;
import com.google.common.base.Supplier;
import com.google.common.collect.Lists;
import com.lambdaworks.redis.AbstractRedisClient;
import com.lambdaworks.redis.RedisAsyncConnectionImpl;
import com.lambdaworks.redis.RedisChannelWriter;
import com.lambdaworks.redis.RedisClusterConnection;
import com.lambdaworks.redis.RedisException;
import com.lambdaworks.redis.RedisURI;
import com.lambdaworks.redis.*;
import com.lambdaworks.redis.cluster.models.partitions.Partitions;
import com.lambdaworks.redis.cluster.models.partitions.RedisClusterNode;
import com.lambdaworks.redis.codec.RedisCodec;
Expand Down Expand Up @@ -420,12 +408,19 @@ protected <T extends Closeable> void forEachCloseable(Predicate<? super Closeabl
/**
* Set the {@link ClusterClientOptions} for the client.
*
* @param clientOptions
* @param clientOptions client options for the client and connections that are created after setting the options
*/
public void setOptions(ClusterClientOptions clientOptions) {
super.setOptions(clientOptions);
}

ClusterClientOptions getClusterClientOptions() {
if (getOptions() instanceof ClusterClientOptions) {
return (ClusterClientOptions) getOptions();
}
return null;
}

private class ClusterTopologyRefreshTask implements Runnable {

public ClusterTopologyRefreshTask() {
Expand All @@ -434,9 +429,8 @@ public ClusterTopologyRefreshTask() {
@Override
public void run() {
logger.debug("ClusterTopologyRefreshTask.run()");
if (isEventLoopActive() && getOptions() instanceof ClusterClientOptions) {
ClusterClientOptions options = (ClusterClientOptions) getOptions();
if (!options.isRefreshClusterView()) {
if (isEventLoopActive() && getClusterClientOptions() != null) {
if (!getClusterClientOptions().isRefreshClusterView()) {
logger.debug("ClusterTopologyRefreshTask is disabled");
return;
}
Expand All @@ -463,7 +457,7 @@ public void run() {
getPartitions().reload(values.get(0).getPartitions());
updatePartitionsInConnections();

if (isEventLoopActive()) {
if (isEventLoopActive() && expireStaleConnections()) {
genericWorkerPool.submit(new CloseStaleConnectionsTask());
}

Expand All @@ -474,17 +468,23 @@ public void run() {
private class CloseStaleConnectionsTask implements Runnable {
@Override
public void run() {
forEachClusterConnection(new Predicate<RedisAdvancedClusterAsyncConnectionImpl<?, ?>>() {
@Override
public boolean apply(RedisAdvancedClusterAsyncConnectionImpl<?, ?> input) {

ClusterDistributionChannelWriter<?, ?> writer = (ClusterDistributionChannelWriter<?, ?>) input
.getChannelWriter();
writer.getClusterConnectionProvider().closeStaleConnections();
return true;
}
});
if (isEventLoopActive() && expireStaleConnections()) {

forEachClusterConnection(new Predicate<RedisAdvancedClusterAsyncConnectionImpl<?, ?>>() {
@Override
public boolean apply(RedisAdvancedClusterAsyncConnectionImpl<?, ?> input) {

ClusterDistributionChannelWriter<?, ?> writer = (ClusterDistributionChannelWriter<?, ?>) input
.getChannelWriter();
writer.getClusterConnectionProvider().closeStaleConnections();
return true;
}
});
}
}
}

boolean expireStaleConnections() {
return getClusterClientOptions() == null || getClusterClientOptions().isCloseStaleConnections();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import java.util.List;
import java.util.concurrent.TimeUnit;

import com.lambdaworks.redis.*;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
Expand All @@ -17,12 +18,6 @@
import com.google.code.tempusfugit.temporal.WaitFor;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import com.lambdaworks.redis.LettuceFutures;
import com.lambdaworks.redis.RedisClusterAsyncConnection;
import com.lambdaworks.redis.RedisClusterConnection;
import com.lambdaworks.redis.RedisException;
import com.lambdaworks.redis.RedisFuture;
import com.lambdaworks.redis.RedisURI;
import com.lambdaworks.redis.cluster.models.partitions.Partitions;
import com.lambdaworks.redis.cluster.models.partitions.RedisClusterNode;

Expand Down Expand Up @@ -161,17 +156,27 @@ public void forbiddenHostOnRedirect() throws Exception {
}

@Test
public void getConnectionToNotAClusterMember() throws Exception {
public void getConnectionToNotAClusterMemberForbidden() throws Exception {

RedisAdvancedClusterConnection<String, String> sync = clusterClient.connectCluster();
try {
sync.getConnection("8.8.8.8", 1234);
sync.getConnection(TestSettings.host(), TestSettings.port());
} catch (RedisException e) {
assertThat(e).hasRootCauseExactlyInstanceOf(IllegalArgumentException.class);
}
sync.close();
}


@Test
public void getConnectionToNotAClusterMemberAllowed() throws Exception {

clusterClient.setOptions(new ClusterClientOptions.Builder().validateClusterNodeMembership(false).build());
RedisAdvancedClusterConnection<String, String> sync = clusterClient.connectCluster();
sync.getConnection(TestSettings.host(), TestSettings.port());
sync.close();
}

@Test
public void pipelining() throws Exception {

Expand Down
Loading

0 comments on commit 0595ff6

Please sign in to comment.