Skip to content

Commit

Permalink
Reconnect default cluster connection if connected node no longer part…
Browse files Browse the repository at this point in the history
… of the cluster #1317

If the default cluster connection points to a node that is no longer part of the cluster, then the connection is reset to point to a cluster member again. Cluster connection facades therefore are aware of their node Id and once the Partitions get updated, the facade verifies cluster membership. The check isn't considering failure flags, only cluster membership.

The connection reset is tied to ClusterClientOptions.isCloseStaleConnections which can be disabled on demand.
  • Loading branch information
mp911de committed Jul 30, 2020
1 parent 9b9fb14 commit 997cfb3
Show file tree
Hide file tree
Showing 9 changed files with 193 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -380,6 +380,10 @@ public CompletableFuture<Void> closeAsync() {
return Futures.allOf(futures);
}

public void disconnectDefaultEndpoint() {
((DefaultEndpoint) defaultWriter).disconnect();
}

@Override
public void setConnectionFacade(ConnectionFacade redisChannelHandler) {
defaultWriter.setConnectionFacade(redisChannelHandler);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@ class PooledClusterConnectionProvider<K, V>

private final RedisClusterClient redisClusterClient;

private final ClusterClientOptions options;

private final ClusterNodeConnectionFactory<K, V> connectionFactory;

private final RedisChannelWriter clusterWriter;
Expand All @@ -85,6 +87,7 @@ public PooledClusterConnectionProvider(RedisClusterClient redisClusterClient, Re

this.redisCodec = redisCodec;
this.redisClusterClient = redisClusterClient;
this.options = redisClusterClient.getClusterClientOptions();
this.clusterWriter = clusterWriter;
this.clusterEventListener = clusterEventListener;
this.connectionFactory = new NodeConnectionPostProcessor(getConnectionFactory(redisClusterClient));
Expand Down Expand Up @@ -517,11 +520,15 @@ private void reconfigurePartitions() {

resetFastConnectionCache();

if (redisClusterClient.expireStaleConnections()) {
if (expireStaleConnections()) {
closeStaleConnections();
}
}

private boolean expireStaleConnections() {
return options == null || options.isCloseStaleConnections();
}

/**
* Close stale connections.
*/
Expand Down
19 changes: 7 additions & 12 deletions src/main/java/io/lettuce/core/cluster/RedisClusterClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -606,8 +606,6 @@ private <K, V> CompletableFuture<StatefulRedisClusterConnection<K, V>> connectCl

logger.debug("connectCluster(" + initialUris + ")");

Mono<SocketAddress> socketAddressSupplier = getSocketAddressSupplier(TopologyComparators::sortByClientCount);

DefaultEndpoint endpoint = new DefaultEndpoint(getClusterClientOptions(), getResources());
RedisChannelWriter writer = endpoint;

Expand All @@ -630,7 +628,8 @@ private <K, V> CompletableFuture<StatefulRedisClusterConnection<K, V>> connectCl

Supplier<CommandHandler> commandHandlerSupplier = () -> new CommandHandler(getClusterClientOptions(), getResources(),
endpoint);

Mono<SocketAddress> socketAddressSupplier = getSocketAddressSupplier(connection::getPartitions,
TopologyComparators::sortByClientCount);
Mono<StatefulRedisClusterConnectionImpl<K, V>> connectionMono = Mono
.defer(() -> connect(socketAddressSupplier, endpoint, connection, commandHandlerSupplier));

Expand Down Expand Up @@ -690,8 +689,6 @@ private <K, V> CompletableFuture<StatefulRedisClusterPubSubConnection<K, V>> con

logger.debug("connectClusterPubSub(" + initialUris + ")");

Mono<SocketAddress> socketAddressSupplier = getSocketAddressSupplier(TopologyComparators::sortByClientCount);

PubSubClusterEndpoint<K, V> endpoint = new PubSubClusterEndpoint<>(getClusterClientOptions(), getResources());
RedisChannelWriter writer = endpoint;

Expand All @@ -713,7 +710,8 @@ private <K, V> CompletableFuture<StatefulRedisClusterPubSubConnection<K, V>> con

Supplier<CommandHandler> commandHandlerSupplier = () -> new PubSubCommandHandler<>(getClusterClientOptions(),
getResources(), codec, endpoint);

Mono<SocketAddress> socketAddressSupplier = getSocketAddressSupplier(connection::getPartitions,
TopologyComparators::sortByClientCount);
Mono<StatefulRedisClusterPubSubConnectionImpl<K, V>> connectionMono = Mono
.defer(() -> connect(socketAddressSupplier, endpoint, connection, commandHandlerSupplier));

Expand Down Expand Up @@ -1027,11 +1025,12 @@ protected RedisURI getFirstUri() {
* parameter but create a new collection with the desired order, must not be {@code null}.
* @return {@link Supplier} for {@link SocketAddress connection points}.
*/
protected Mono<SocketAddress> getSocketAddressSupplier(Function<Partitions, Collection<RedisClusterNode>> sortFunction) {
protected Mono<SocketAddress> getSocketAddressSupplier(Supplier<Partitions> partitionsSupplier,
Function<Partitions, Collection<RedisClusterNode>> sortFunction) {

LettuceAssert.notNull(sortFunction, "Sort function must not be null");

final RoundRobinSocketAddressSupplier socketAddressSupplier = new RoundRobinSocketAddressSupplier(partitions,
RoundRobinSocketAddressSupplier socketAddressSupplier = new RoundRobinSocketAddressSupplier(partitionsSupplier,
sortFunction, getResources());

return Mono.defer(() -> {
Expand Down Expand Up @@ -1138,10 +1137,6 @@ ClusterClientOptions getClusterClientOptions() {
return (ClusterClientOptions) getOptions();
}

boolean expireStaleConnections() {
return getClusterClientOptions() == null || getClusterClientOptions().isCloseStaleConnections();
}

protected static <T> CompletableFuture<T> transformAsyncConnectionException(CompletionStage<T> future,
Iterable<RedisURI> target) {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.util.function.Function;
import java.util.function.Supplier;

import io.lettuce.core.cluster.models.partitions.Partitions;
import io.lettuce.core.cluster.models.partitions.RedisClusterNode;
import io.lettuce.core.internal.LettuceAssert;
import io.lettuce.core.resource.ClientResources;
Expand All @@ -35,15 +36,15 @@ class RoundRobinSocketAddressSupplier implements Supplier<SocketAddress> {

private static final InternalLogger logger = InternalLoggerFactory.getInstance(RoundRobinSocketAddressSupplier.class);

private final Collection<RedisClusterNode> partitions;
private final Supplier<Partitions> partitions;

private final Function<Collection<RedisClusterNode>, Collection<RedisClusterNode>> sortFunction;

private final ClientResources clientResources;

private RoundRobin<RedisClusterNode> roundRobin;
private final RoundRobin<RedisClusterNode> roundRobin;

public RoundRobinSocketAddressSupplier(Collection<RedisClusterNode> partitions,
public RoundRobinSocketAddressSupplier(Supplier<Partitions> partitions,
Function<? extends Collection<RedisClusterNode>, Collection<RedisClusterNode>> sortFunction,
ClientResources clientResources) {

Expand All @@ -54,21 +55,22 @@ public RoundRobinSocketAddressSupplier(Collection<RedisClusterNode> partitions,
this.roundRobin = new RoundRobin<>();
this.sortFunction = (Function) sortFunction;
this.clientResources = clientResources;
resetRoundRobin();
resetRoundRobin(partitions.get());
}

@Override
public SocketAddress get() {

Partitions partitions = this.partitions.get();
if (!roundRobin.isConsistent(partitions)) {
resetRoundRobin();
resetRoundRobin(partitions);
}

RedisClusterNode redisClusterNode = roundRobin.next();
return getSocketAddress(redisClusterNode);
}

protected void resetRoundRobin() {
protected void resetRoundRobin(Partitions partitions) {
roundRobin.rebuild(sortFunction.apply(partitions));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ public class StatefulRedisClusterConnectionImpl<K, V> extends RedisChannelHandle

private final ClusterConnectionState connectionState = new ClusterConnectionState();

private Partitions partitions;
private volatile Partitions partitions;

private volatile CommandSet commandSet;

Expand Down Expand Up @@ -189,6 +189,13 @@ public CompletableFuture<StatefulRedisConnection<K, V>> getConnectionAsync(Strin
return provider.getConnectionAsync(ClusterConnectionProvider.Intent.WRITE, host, port);
}

@Override
public void activated() {
super.activated();

async.clusterMyId().thenAccept(connectionState::setNodeId);
}

ClusterDistributionChannelWriter getClusterDistributionChannelWriter() {
return (ClusterDistributionChannelWriter) super.getChannelWriter();
}
Expand Down Expand Up @@ -258,7 +265,19 @@ private <T> RedisCommand<K, V, T> attachOnComplete(RedisCommand<K, V, T> command
}

public void setPartitions(Partitions partitions) {

LettuceAssert.notNull(partitions, "Partitions must not be null");

this.partitions = partitions;

String nodeId = connectionState.getNodeId();
if (nodeId != null && expireStaleConnections()) {

if (partitions.getPartitionByNodeId(nodeId) == null) {
getClusterDistributionChannelWriter().disconnectDefaultEndpoint();
}
}

getClusterDistributionChannelWriter().setPartitions(partitions);
}

Expand All @@ -283,6 +302,8 @@ ConnectionState getConnectionState() {

static class ClusterConnectionState extends ConnectionState {

private volatile String nodeId;

@Override
protected void setUserNamePassword(List<char[]> args) {
super.setUserNamePassword(args);
Expand All @@ -298,6 +319,26 @@ protected void setReadOnly(boolean readOnly) {
super.setReadOnly(readOnly);
}

public String getNodeId() {
return nodeId;
}

public void setNodeId(String nodeId) {
this.nodeId = nodeId;
}

}

private boolean expireStaleConnections() {

ClusterClientOptions options = getClusterClientOptions();
return options == null || options.isCloseStaleConnections();
}

private ClusterClientOptions getClusterClientOptions() {

ClientOptions options = getOptions();
return options instanceof ClusterClientOptions ? (ClusterClientOptions) options : null;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import io.lettuce.core.cluster.pubsub.api.sync.PubSubNodeSelection;
import io.lettuce.core.cluster.pubsub.api.sync.RedisClusterPubSubCommands;
import io.lettuce.core.codec.RedisCodec;
import io.lettuce.core.internal.LettuceAssert;
import io.lettuce.core.pubsub.RedisPubSubAsyncCommandsImpl;
import io.lettuce.core.pubsub.RedisPubSubReactiveCommandsImpl;
import io.lettuce.core.pubsub.StatefulRedisPubSubConnection;
Expand All @@ -54,6 +55,8 @@ class StatefulRedisClusterPubSubConnectionImpl<K, V> extends StatefulRedisPubSub

private volatile CommandSet commandSet;

private volatile String nodeId;

/**
* Initialize a new connection.
*
Expand Down Expand Up @@ -170,11 +173,38 @@ public CompletableFuture<StatefulRedisPubSubConnection<K, V>> getConnectionAsync
return (CompletableFuture) provider.getConnectionAsync(ClusterConnectionProvider.Intent.WRITE, host, port);
}

@Override
public void activated() {
super.activated();

async.clusterMyId().thenAccept(this::setNodeId);
}

public void setPartitions(Partitions partitions) {

LettuceAssert.notNull(partitions, "Partitions must not be null");

this.partitions = partitions;

String nodeId = getNodeId();
if (nodeId != null && expireStaleConnections()) {

if (partitions.getPartitionByNodeId(nodeId) == null) {
endpoint.disconnect();
}
}

getClusterDistributionChannelWriter().setPartitions(partitions);
}

private String getNodeId() {
return this.nodeId;
}

private void setNodeId(String nodeId) {
this.nodeId = nodeId;
}

public Partitions getPartitions() {
return partitions;
}
Expand Down Expand Up @@ -228,4 +258,16 @@ private RedisURI lookup(String nodeId) {
return null;
}

private boolean expireStaleConnections() {

ClusterClientOptions options = getClusterClientOptions();
return options == null || options.isCloseStaleConnections();
}

private ClusterClientOptions getClusterClientOptions() {

ClientOptions options = getOptions();
return options instanceof ClusterClientOptions ? (ClusterClientOptions) options : null;
}

}
11 changes: 11 additions & 0 deletions src/main/java/io/lettuce/core/protocol/DefaultEndpoint.java
Original file line number Diff line number Diff line change
Expand Up @@ -601,7 +601,18 @@ public CompletableFuture<Void> closeAsync() {
}

return closeFuture;
}

/**
* Disconnect the channel.
*/
public void disconnect() {

Channel channel = this.channel;

if (channel != null && channel.isOpen()) {
channel.disconnect();
}
}

private Channel getOpenChannel() {
Expand Down
Loading

0 comments on commit 997cfb3

Please sign in to comment.