From 997cfb38d9eb5f9c0ec73a2f74c1ea42e07e44aa Mon Sep 17 00:00:00 2001 From: Mark Paluch Date: Thu, 30 Jul 2020 11:42:50 +0200 Subject: [PATCH] Reconnect default cluster connection if connected node no longer part of the cluster #1317 If the default cluster connection points to a node that is no longer part of the cluster, then the connection is reset to point to a cluster member again. Cluster connection facades therefore are aware of their node Id and once the Partitions get updated, the facade verifies cluster membership. The check isn't considering failure flags, only cluster membership. The connection reset is tied to ClusterClientOptions.isCloseStaleConnections which can be disabled on demand. --- .../ClusterDistributionChannelWriter.java | 4 ++ .../PooledClusterConnectionProvider.java | 9 ++- .../core/cluster/RedisClusterClient.java | 19 ++--- .../RoundRobinSocketAddressSupplier.java | 14 ++-- .../StatefulRedisClusterConnectionImpl.java | 43 +++++++++++- ...tefulRedisClusterPubSubConnectionImpl.java | 42 +++++++++++ .../core/protocol/DefaultEndpoint.java | 11 +++ .../core/cluster/RedisClusterSetupTest.java | 70 ++++++++++++++++++- ...ndRobinSocketAddressSupplierUnitTests.java | 6 +- 9 files changed, 193 insertions(+), 25 deletions(-) diff --git a/src/main/java/io/lettuce/core/cluster/ClusterDistributionChannelWriter.java b/src/main/java/io/lettuce/core/cluster/ClusterDistributionChannelWriter.java index 8e48b954d8..b50fda0875 100644 --- a/src/main/java/io/lettuce/core/cluster/ClusterDistributionChannelWriter.java +++ b/src/main/java/io/lettuce/core/cluster/ClusterDistributionChannelWriter.java @@ -380,6 +380,10 @@ public CompletableFuture closeAsync() { return Futures.allOf(futures); } + public void disconnectDefaultEndpoint() { + ((DefaultEndpoint) defaultWriter).disconnect(); + } + @Override public void setConnectionFacade(ConnectionFacade redisChannelHandler) { defaultWriter.setConnectionFacade(redisChannelHandler); diff --git a/src/main/java/io/lettuce/core/cluster/PooledClusterConnectionProvider.java b/src/main/java/io/lettuce/core/cluster/PooledClusterConnectionProvider.java index f45a29ccf0..be8b46f571 100644 --- a/src/main/java/io/lettuce/core/cluster/PooledClusterConnectionProvider.java +++ b/src/main/java/io/lettuce/core/cluster/PooledClusterConnectionProvider.java @@ -64,6 +64,8 @@ class PooledClusterConnectionProvider private final RedisClusterClient redisClusterClient; + private final ClusterClientOptions options; + private final ClusterNodeConnectionFactory connectionFactory; private final RedisChannelWriter clusterWriter; @@ -85,6 +87,7 @@ public PooledClusterConnectionProvider(RedisClusterClient redisClusterClient, Re this.redisCodec = redisCodec; this.redisClusterClient = redisClusterClient; + this.options = redisClusterClient.getClusterClientOptions(); this.clusterWriter = clusterWriter; this.clusterEventListener = clusterEventListener; this.connectionFactory = new NodeConnectionPostProcessor(getConnectionFactory(redisClusterClient)); @@ -517,11 +520,15 @@ private void reconfigurePartitions() { resetFastConnectionCache(); - if (redisClusterClient.expireStaleConnections()) { + if (expireStaleConnections()) { closeStaleConnections(); } } + private boolean expireStaleConnections() { + return options == null || options.isCloseStaleConnections(); + } + /** * Close stale connections. */ diff --git a/src/main/java/io/lettuce/core/cluster/RedisClusterClient.java b/src/main/java/io/lettuce/core/cluster/RedisClusterClient.java index b107fe58e7..d8af7afb11 100644 --- a/src/main/java/io/lettuce/core/cluster/RedisClusterClient.java +++ b/src/main/java/io/lettuce/core/cluster/RedisClusterClient.java @@ -606,8 +606,6 @@ private CompletableFuture> connectCl logger.debug("connectCluster(" + initialUris + ")"); - Mono socketAddressSupplier = getSocketAddressSupplier(TopologyComparators::sortByClientCount); - DefaultEndpoint endpoint = new DefaultEndpoint(getClusterClientOptions(), getResources()); RedisChannelWriter writer = endpoint; @@ -630,7 +628,8 @@ private CompletableFuture> connectCl Supplier commandHandlerSupplier = () -> new CommandHandler(getClusterClientOptions(), getResources(), endpoint); - + Mono socketAddressSupplier = getSocketAddressSupplier(connection::getPartitions, + TopologyComparators::sortByClientCount); Mono> connectionMono = Mono .defer(() -> connect(socketAddressSupplier, endpoint, connection, commandHandlerSupplier)); @@ -690,8 +689,6 @@ private CompletableFuture> con logger.debug("connectClusterPubSub(" + initialUris + ")"); - Mono socketAddressSupplier = getSocketAddressSupplier(TopologyComparators::sortByClientCount); - PubSubClusterEndpoint endpoint = new PubSubClusterEndpoint<>(getClusterClientOptions(), getResources()); RedisChannelWriter writer = endpoint; @@ -713,7 +710,8 @@ private CompletableFuture> con Supplier commandHandlerSupplier = () -> new PubSubCommandHandler<>(getClusterClientOptions(), getResources(), codec, endpoint); - + Mono socketAddressSupplier = getSocketAddressSupplier(connection::getPartitions, + TopologyComparators::sortByClientCount); Mono> connectionMono = Mono .defer(() -> connect(socketAddressSupplier, endpoint, connection, commandHandlerSupplier)); @@ -1027,11 +1025,12 @@ protected RedisURI getFirstUri() { * parameter but create a new collection with the desired order, must not be {@code null}. * @return {@link Supplier} for {@link SocketAddress connection points}. */ - protected Mono getSocketAddressSupplier(Function> sortFunction) { + protected Mono getSocketAddressSupplier(Supplier partitionsSupplier, + Function> sortFunction) { LettuceAssert.notNull(sortFunction, "Sort function must not be null"); - final RoundRobinSocketAddressSupplier socketAddressSupplier = new RoundRobinSocketAddressSupplier(partitions, + RoundRobinSocketAddressSupplier socketAddressSupplier = new RoundRobinSocketAddressSupplier(partitionsSupplier, sortFunction, getResources()); return Mono.defer(() -> { @@ -1138,10 +1137,6 @@ ClusterClientOptions getClusterClientOptions() { return (ClusterClientOptions) getOptions(); } - boolean expireStaleConnections() { - return getClusterClientOptions() == null || getClusterClientOptions().isCloseStaleConnections(); - } - protected static CompletableFuture transformAsyncConnectionException(CompletionStage future, Iterable target) { diff --git a/src/main/java/io/lettuce/core/cluster/RoundRobinSocketAddressSupplier.java b/src/main/java/io/lettuce/core/cluster/RoundRobinSocketAddressSupplier.java index af9a9d0ab9..701507c5c9 100644 --- a/src/main/java/io/lettuce/core/cluster/RoundRobinSocketAddressSupplier.java +++ b/src/main/java/io/lettuce/core/cluster/RoundRobinSocketAddressSupplier.java @@ -20,6 +20,7 @@ import java.util.function.Function; import java.util.function.Supplier; +import io.lettuce.core.cluster.models.partitions.Partitions; import io.lettuce.core.cluster.models.partitions.RedisClusterNode; import io.lettuce.core.internal.LettuceAssert; import io.lettuce.core.resource.ClientResources; @@ -35,15 +36,15 @@ class RoundRobinSocketAddressSupplier implements Supplier { private static final InternalLogger logger = InternalLoggerFactory.getInstance(RoundRobinSocketAddressSupplier.class); - private final Collection partitions; + private final Supplier partitions; private final Function, Collection> sortFunction; private final ClientResources clientResources; - private RoundRobin roundRobin; + private final RoundRobin roundRobin; - public RoundRobinSocketAddressSupplier(Collection partitions, + public RoundRobinSocketAddressSupplier(Supplier partitions, Function, Collection> sortFunction, ClientResources clientResources) { @@ -54,21 +55,22 @@ public RoundRobinSocketAddressSupplier(Collection partitions, this.roundRobin = new RoundRobin<>(); this.sortFunction = (Function) sortFunction; this.clientResources = clientResources; - resetRoundRobin(); + resetRoundRobin(partitions.get()); } @Override public SocketAddress get() { + Partitions partitions = this.partitions.get(); if (!roundRobin.isConsistent(partitions)) { - resetRoundRobin(); + resetRoundRobin(partitions); } RedisClusterNode redisClusterNode = roundRobin.next(); return getSocketAddress(redisClusterNode); } - protected void resetRoundRobin() { + protected void resetRoundRobin(Partitions partitions) { roundRobin.rebuild(sortFunction.apply(partitions)); } diff --git a/src/main/java/io/lettuce/core/cluster/StatefulRedisClusterConnectionImpl.java b/src/main/java/io/lettuce/core/cluster/StatefulRedisClusterConnectionImpl.java index d6f8efc062..b56c3fe98a 100644 --- a/src/main/java/io/lettuce/core/cluster/StatefulRedisClusterConnectionImpl.java +++ b/src/main/java/io/lettuce/core/cluster/StatefulRedisClusterConnectionImpl.java @@ -72,7 +72,7 @@ public class StatefulRedisClusterConnectionImpl extends RedisChannelHandle private final ClusterConnectionState connectionState = new ClusterConnectionState(); - private Partitions partitions; + private volatile Partitions partitions; private volatile CommandSet commandSet; @@ -189,6 +189,13 @@ public CompletableFuture> getConnectionAsync(Strin return provider.getConnectionAsync(ClusterConnectionProvider.Intent.WRITE, host, port); } + @Override + public void activated() { + super.activated(); + + async.clusterMyId().thenAccept(connectionState::setNodeId); + } + ClusterDistributionChannelWriter getClusterDistributionChannelWriter() { return (ClusterDistributionChannelWriter) super.getChannelWriter(); } @@ -258,7 +265,19 @@ private RedisCommand attachOnComplete(RedisCommand command } public void setPartitions(Partitions partitions) { + + LettuceAssert.notNull(partitions, "Partitions must not be null"); + this.partitions = partitions; + + String nodeId = connectionState.getNodeId(); + if (nodeId != null && expireStaleConnections()) { + + if (partitions.getPartitionByNodeId(nodeId) == null) { + getClusterDistributionChannelWriter().disconnectDefaultEndpoint(); + } + } + getClusterDistributionChannelWriter().setPartitions(partitions); } @@ -283,6 +302,8 @@ ConnectionState getConnectionState() { static class ClusterConnectionState extends ConnectionState { + private volatile String nodeId; + @Override protected void setUserNamePassword(List args) { super.setUserNamePassword(args); @@ -298,6 +319,26 @@ protected void setReadOnly(boolean readOnly) { super.setReadOnly(readOnly); } + public String getNodeId() { + return nodeId; + } + + public void setNodeId(String nodeId) { + this.nodeId = nodeId; + } + + } + + private boolean expireStaleConnections() { + + ClusterClientOptions options = getClusterClientOptions(); + return options == null || options.isCloseStaleConnections(); + } + + private ClusterClientOptions getClusterClientOptions() { + + ClientOptions options = getOptions(); + return options instanceof ClusterClientOptions ? (ClusterClientOptions) options : null; } } diff --git a/src/main/java/io/lettuce/core/cluster/StatefulRedisClusterPubSubConnectionImpl.java b/src/main/java/io/lettuce/core/cluster/StatefulRedisClusterPubSubConnectionImpl.java index 6f3387e98b..2886a61e65 100644 --- a/src/main/java/io/lettuce/core/cluster/StatefulRedisClusterPubSubConnectionImpl.java +++ b/src/main/java/io/lettuce/core/cluster/StatefulRedisClusterPubSubConnectionImpl.java @@ -33,6 +33,7 @@ import io.lettuce.core.cluster.pubsub.api.sync.PubSubNodeSelection; import io.lettuce.core.cluster.pubsub.api.sync.RedisClusterPubSubCommands; import io.lettuce.core.codec.RedisCodec; +import io.lettuce.core.internal.LettuceAssert; import io.lettuce.core.pubsub.RedisPubSubAsyncCommandsImpl; import io.lettuce.core.pubsub.RedisPubSubReactiveCommandsImpl; import io.lettuce.core.pubsub.StatefulRedisPubSubConnection; @@ -54,6 +55,8 @@ class StatefulRedisClusterPubSubConnectionImpl extends StatefulRedisPubSub private volatile CommandSet commandSet; + private volatile String nodeId; + /** * Initialize a new connection. * @@ -170,11 +173,38 @@ public CompletableFuture> getConnectionAsync return (CompletableFuture) provider.getConnectionAsync(ClusterConnectionProvider.Intent.WRITE, host, port); } + @Override + public void activated() { + super.activated(); + + async.clusterMyId().thenAccept(this::setNodeId); + } + public void setPartitions(Partitions partitions) { + + LettuceAssert.notNull(partitions, "Partitions must not be null"); + this.partitions = partitions; + + String nodeId = getNodeId(); + if (nodeId != null && expireStaleConnections()) { + + if (partitions.getPartitionByNodeId(nodeId) == null) { + endpoint.disconnect(); + } + } + getClusterDistributionChannelWriter().setPartitions(partitions); } + private String getNodeId() { + return this.nodeId; + } + + private void setNodeId(String nodeId) { + this.nodeId = nodeId; + } + public Partitions getPartitions() { return partitions; } @@ -228,4 +258,16 @@ private RedisURI lookup(String nodeId) { return null; } + private boolean expireStaleConnections() { + + ClusterClientOptions options = getClusterClientOptions(); + return options == null || options.isCloseStaleConnections(); + } + + private ClusterClientOptions getClusterClientOptions() { + + ClientOptions options = getOptions(); + return options instanceof ClusterClientOptions ? (ClusterClientOptions) options : null; + } + } diff --git a/src/main/java/io/lettuce/core/protocol/DefaultEndpoint.java b/src/main/java/io/lettuce/core/protocol/DefaultEndpoint.java index 950a49e6b7..a5e8b88398 100644 --- a/src/main/java/io/lettuce/core/protocol/DefaultEndpoint.java +++ b/src/main/java/io/lettuce/core/protocol/DefaultEndpoint.java @@ -601,7 +601,18 @@ public CompletableFuture closeAsync() { } return closeFuture; + } + + /** + * Disconnect the channel. + */ + public void disconnect() { + Channel channel = this.channel; + + if (channel != null && channel.isOpen()) { + channel.disconnect(); + } } private Channel getOpenChannel() { diff --git a/src/test/java/io/lettuce/core/cluster/RedisClusterSetupTest.java b/src/test/java/io/lettuce/core/cluster/RedisClusterSetupTest.java index 459251e6cc..3cc9050335 100644 --- a/src/test/java/io/lettuce/core/cluster/RedisClusterSetupTest.java +++ b/src/test/java/io/lettuce/core/cluster/RedisClusterSetupTest.java @@ -16,6 +16,7 @@ package io.lettuce.core.cluster; import static io.lettuce.core.cluster.ClusterTestSettings.createSlots; +import static io.lettuce.core.cluster.ClusterTestSettings.port5; import static io.lettuce.core.cluster.ClusterTestUtil.getNodeId; import static io.lettuce.core.cluster.ClusterTestUtil.getOwnPartition; import static org.assertj.core.api.Assertions.assertThat; @@ -346,7 +347,73 @@ public void atLeastOnceForgetNodeFailover() throws Exception { } @Test - public void expireStaleNodeIdConnections() throws Exception { + public void expireStaleDefaultConnection() { + + ClusterSetup.setup2Masters(clusterRule); + + RedisClusterClient redisClusterClient = RedisClusterClient.create(TestClientResources.get(), + RedisURI.Builder.redis(host, port5).build()); + redisClusterClient.setOptions(ClusterClientOptions.builder() + .topologyRefreshOptions(ClusterTopologyRefreshOptions.builder().dynamicRefreshSources(false).build()).build()); + + StatefulRedisClusterConnectionImpl connection = (StatefulRedisClusterConnectionImpl) redisClusterClient + .connect(); + String firstMaster = connection.sync().clusterMyId(); + + RedisClusterNode firstMasterNode = connection.getPartitions().getPartitionByNodeId(firstMaster); + + assertThat(firstMasterNode.getUri().getPort()).isEqualTo(port5); + + redis2.clusterForget(redis1.clusterMyId()); + redis1.clusterForget(redis2.clusterMyId()); + + Partitions partitions = ClusterPartitionParser.parse(redis2.clusterNodes()); + connection.setPartitions(partitions); + + Wait.untilTrue(connection::isOpen).waitOrTimeout(); + + String secondMaster = connection.sync().clusterMyId(); + assertThat(secondMaster).isEqualTo(redis2.clusterMyId()); + connection.close(); + + FastShutdown.shutdown(redisClusterClient); + } + + @Test + public void expireStaleDefaultPubSubConnection() { + + ClusterSetup.setup2Masters(clusterRule); + + RedisClusterClient redisClusterClient = RedisClusterClient.create(TestClientResources.get(), + RedisURI.Builder.redis(host, port5).build()); + redisClusterClient.setOptions(ClusterClientOptions.builder() + .topologyRefreshOptions(ClusterTopologyRefreshOptions.builder().dynamicRefreshSources(false).build()).build()); + + StatefulRedisClusterPubSubConnectionImpl connection = (StatefulRedisClusterPubSubConnectionImpl) redisClusterClient + .connectPubSub(); + String firstMaster = connection.sync().clusterMyId(); + + RedisClusterNode firstMasterNode = connection.getPartitions().getPartitionByNodeId(firstMaster); + + assertThat(firstMasterNode.getUri().getPort()).isEqualTo(port5); + + redis2.clusterForget(redis1.clusterMyId()); + redis1.clusterForget(redis2.clusterMyId()); + + Partitions partitions = ClusterPartitionParser.parse(redis2.clusterNodes()); + connection.setPartitions(partitions); + + Wait.untilTrue(connection::isOpen).waitOrTimeout(); + + String secondMaster = connection.sync().clusterMyId(); + assertThat(secondMaster).isEqualTo(redis2.clusterMyId()); + connection.close(); + + FastShutdown.shutdown(redisClusterClient); + } + + @Test + public void expireStaleNodeIdConnections() { clusterClient.setOptions(ClusterClientOptions.builder().topologyRefreshOptions(PERIODIC_REFRESH_ENABLED).build()); RedisAdvancedClusterAsyncCommands clusterConnection = clusterClient.connect().async(); @@ -379,7 +446,6 @@ public void expireStaleNodeIdConnections() throws Exception { Wait.untilEquals(1, () -> clusterConnectionProvider.getConnectionCount()).waitOrTimeout(); clusterConnection.getStatefulConnection().close(); - } private void assertRoutedExecution(RedisClusterAsyncCommands clusterConnection) { diff --git a/src/test/java/io/lettuce/core/cluster/RoundRobinSocketAddressSupplierUnitTests.java b/src/test/java/io/lettuce/core/cluster/RoundRobinSocketAddressSupplierUnitTests.java index a64fe3599c..f409e7207f 100644 --- a/src/test/java/io/lettuce/core/cluster/RoundRobinSocketAddressSupplierUnitTests.java +++ b/src/test/java/io/lettuce/core/cluster/RoundRobinSocketAddressSupplierUnitTests.java @@ -74,7 +74,7 @@ void before() { @Test void noOffset() { - RoundRobinSocketAddressSupplier sut = new RoundRobinSocketAddressSupplier(partitions, + RoundRobinSocketAddressSupplier sut = new RoundRobinSocketAddressSupplier(() -> partitions, redisClusterNodes -> redisClusterNodes, clientResourcesMock); assertThat(sut.get()).isEqualTo(addr1); @@ -88,7 +88,7 @@ void noOffset() { @Test void partitionTableChangesNewNode() { - RoundRobinSocketAddressSupplier sut = new RoundRobinSocketAddressSupplier(partitions, + RoundRobinSocketAddressSupplier sut = new RoundRobinSocketAddressSupplier(() -> partitions, redisClusterNodes -> redisClusterNodes, clientResourcesMock); assertThat(sut.get()).isEqualTo(addr1); @@ -106,7 +106,7 @@ void partitionTableChangesNewNode() { @Test void partitionTableChangesNodeRemoved() { - RoundRobinSocketAddressSupplier sut = new RoundRobinSocketAddressSupplier(partitions, + RoundRobinSocketAddressSupplier sut = new RoundRobinSocketAddressSupplier(() -> partitions, redisClusterNodes -> redisClusterNodes, clientResourcesMock); assertThat(sut.get()).isEqualTo(addr1);