diff --git a/src/main/java/io/lettuce/core/cluster/ClusterDistributionChannelWriter.java b/src/main/java/io/lettuce/core/cluster/ClusterDistributionChannelWriter.java index 8e48b954d8..b50fda0875 100644 --- a/src/main/java/io/lettuce/core/cluster/ClusterDistributionChannelWriter.java +++ b/src/main/java/io/lettuce/core/cluster/ClusterDistributionChannelWriter.java @@ -380,6 +380,10 @@ public CompletableFuture closeAsync() { return Futures.allOf(futures); } + public void disconnectDefaultEndpoint() { + ((DefaultEndpoint) defaultWriter).disconnect(); + } + @Override public void setConnectionFacade(ConnectionFacade redisChannelHandler) { defaultWriter.setConnectionFacade(redisChannelHandler); diff --git a/src/main/java/io/lettuce/core/cluster/PooledClusterConnectionProvider.java b/src/main/java/io/lettuce/core/cluster/PooledClusterConnectionProvider.java index f45a29ccf0..be8b46f571 100644 --- a/src/main/java/io/lettuce/core/cluster/PooledClusterConnectionProvider.java +++ b/src/main/java/io/lettuce/core/cluster/PooledClusterConnectionProvider.java @@ -64,6 +64,8 @@ class PooledClusterConnectionProvider private final RedisClusterClient redisClusterClient; + private final ClusterClientOptions options; + private final ClusterNodeConnectionFactory connectionFactory; private final RedisChannelWriter clusterWriter; @@ -85,6 +87,7 @@ public PooledClusterConnectionProvider(RedisClusterClient redisClusterClient, Re this.redisCodec = redisCodec; this.redisClusterClient = redisClusterClient; + this.options = redisClusterClient.getClusterClientOptions(); this.clusterWriter = clusterWriter; this.clusterEventListener = clusterEventListener; this.connectionFactory = new NodeConnectionPostProcessor(getConnectionFactory(redisClusterClient)); @@ -517,11 +520,15 @@ private void reconfigurePartitions() { resetFastConnectionCache(); - if (redisClusterClient.expireStaleConnections()) { + if (expireStaleConnections()) { closeStaleConnections(); } } + private boolean expireStaleConnections() { + return options == null || options.isCloseStaleConnections(); + } + /** * Close stale connections. */ diff --git a/src/main/java/io/lettuce/core/cluster/RedisClusterClient.java b/src/main/java/io/lettuce/core/cluster/RedisClusterClient.java index b107fe58e7..d8af7afb11 100644 --- a/src/main/java/io/lettuce/core/cluster/RedisClusterClient.java +++ b/src/main/java/io/lettuce/core/cluster/RedisClusterClient.java @@ -606,8 +606,6 @@ private CompletableFuture> connectCl logger.debug("connectCluster(" + initialUris + ")"); - Mono socketAddressSupplier = getSocketAddressSupplier(TopologyComparators::sortByClientCount); - DefaultEndpoint endpoint = new DefaultEndpoint(getClusterClientOptions(), getResources()); RedisChannelWriter writer = endpoint; @@ -630,7 +628,8 @@ private CompletableFuture> connectCl Supplier commandHandlerSupplier = () -> new CommandHandler(getClusterClientOptions(), getResources(), endpoint); - + Mono socketAddressSupplier = getSocketAddressSupplier(connection::getPartitions, + TopologyComparators::sortByClientCount); Mono> connectionMono = Mono .defer(() -> connect(socketAddressSupplier, endpoint, connection, commandHandlerSupplier)); @@ -690,8 +689,6 @@ private CompletableFuture> con logger.debug("connectClusterPubSub(" + initialUris + ")"); - Mono socketAddressSupplier = getSocketAddressSupplier(TopologyComparators::sortByClientCount); - PubSubClusterEndpoint endpoint = new PubSubClusterEndpoint<>(getClusterClientOptions(), getResources()); RedisChannelWriter writer = endpoint; @@ -713,7 +710,8 @@ private CompletableFuture> con Supplier commandHandlerSupplier = () -> new PubSubCommandHandler<>(getClusterClientOptions(), getResources(), codec, endpoint); - + Mono socketAddressSupplier = getSocketAddressSupplier(connection::getPartitions, + TopologyComparators::sortByClientCount); Mono> connectionMono = Mono .defer(() -> connect(socketAddressSupplier, endpoint, connection, commandHandlerSupplier)); @@ -1027,11 +1025,12 @@ protected RedisURI getFirstUri() { * parameter but create a new collection with the desired order, must not be {@code null}. * @return {@link Supplier} for {@link SocketAddress connection points}. */ - protected Mono getSocketAddressSupplier(Function> sortFunction) { + protected Mono getSocketAddressSupplier(Supplier partitionsSupplier, + Function> sortFunction) { LettuceAssert.notNull(sortFunction, "Sort function must not be null"); - final RoundRobinSocketAddressSupplier socketAddressSupplier = new RoundRobinSocketAddressSupplier(partitions, + RoundRobinSocketAddressSupplier socketAddressSupplier = new RoundRobinSocketAddressSupplier(partitionsSupplier, sortFunction, getResources()); return Mono.defer(() -> { @@ -1138,10 +1137,6 @@ ClusterClientOptions getClusterClientOptions() { return (ClusterClientOptions) getOptions(); } - boolean expireStaleConnections() { - return getClusterClientOptions() == null || getClusterClientOptions().isCloseStaleConnections(); - } - protected static CompletableFuture transformAsyncConnectionException(CompletionStage future, Iterable target) { diff --git a/src/main/java/io/lettuce/core/cluster/RoundRobinSocketAddressSupplier.java b/src/main/java/io/lettuce/core/cluster/RoundRobinSocketAddressSupplier.java index af9a9d0ab9..701507c5c9 100644 --- a/src/main/java/io/lettuce/core/cluster/RoundRobinSocketAddressSupplier.java +++ b/src/main/java/io/lettuce/core/cluster/RoundRobinSocketAddressSupplier.java @@ -20,6 +20,7 @@ import java.util.function.Function; import java.util.function.Supplier; +import io.lettuce.core.cluster.models.partitions.Partitions; import io.lettuce.core.cluster.models.partitions.RedisClusterNode; import io.lettuce.core.internal.LettuceAssert; import io.lettuce.core.resource.ClientResources; @@ -35,15 +36,15 @@ class RoundRobinSocketAddressSupplier implements Supplier { private static final InternalLogger logger = InternalLoggerFactory.getInstance(RoundRobinSocketAddressSupplier.class); - private final Collection partitions; + private final Supplier partitions; private final Function, Collection> sortFunction; private final ClientResources clientResources; - private RoundRobin roundRobin; + private final RoundRobin roundRobin; - public RoundRobinSocketAddressSupplier(Collection partitions, + public RoundRobinSocketAddressSupplier(Supplier partitions, Function, Collection> sortFunction, ClientResources clientResources) { @@ -54,21 +55,22 @@ public RoundRobinSocketAddressSupplier(Collection partitions, this.roundRobin = new RoundRobin<>(); this.sortFunction = (Function) sortFunction; this.clientResources = clientResources; - resetRoundRobin(); + resetRoundRobin(partitions.get()); } @Override public SocketAddress get() { + Partitions partitions = this.partitions.get(); if (!roundRobin.isConsistent(partitions)) { - resetRoundRobin(); + resetRoundRobin(partitions); } RedisClusterNode redisClusterNode = roundRobin.next(); return getSocketAddress(redisClusterNode); } - protected void resetRoundRobin() { + protected void resetRoundRobin(Partitions partitions) { roundRobin.rebuild(sortFunction.apply(partitions)); } diff --git a/src/main/java/io/lettuce/core/cluster/StatefulRedisClusterConnectionImpl.java b/src/main/java/io/lettuce/core/cluster/StatefulRedisClusterConnectionImpl.java index d6f8efc062..b56c3fe98a 100644 --- a/src/main/java/io/lettuce/core/cluster/StatefulRedisClusterConnectionImpl.java +++ b/src/main/java/io/lettuce/core/cluster/StatefulRedisClusterConnectionImpl.java @@ -72,7 +72,7 @@ public class StatefulRedisClusterConnectionImpl extends RedisChannelHandle private final ClusterConnectionState connectionState = new ClusterConnectionState(); - private Partitions partitions; + private volatile Partitions partitions; private volatile CommandSet commandSet; @@ -189,6 +189,13 @@ public CompletableFuture> getConnectionAsync(Strin return provider.getConnectionAsync(ClusterConnectionProvider.Intent.WRITE, host, port); } + @Override + public void activated() { + super.activated(); + + async.clusterMyId().thenAccept(connectionState::setNodeId); + } + ClusterDistributionChannelWriter getClusterDistributionChannelWriter() { return (ClusterDistributionChannelWriter) super.getChannelWriter(); } @@ -258,7 +265,19 @@ private RedisCommand attachOnComplete(RedisCommand command } public void setPartitions(Partitions partitions) { + + LettuceAssert.notNull(partitions, "Partitions must not be null"); + this.partitions = partitions; + + String nodeId = connectionState.getNodeId(); + if (nodeId != null && expireStaleConnections()) { + + if (partitions.getPartitionByNodeId(nodeId) == null) { + getClusterDistributionChannelWriter().disconnectDefaultEndpoint(); + } + } + getClusterDistributionChannelWriter().setPartitions(partitions); } @@ -283,6 +302,8 @@ ConnectionState getConnectionState() { static class ClusterConnectionState extends ConnectionState { + private volatile String nodeId; + @Override protected void setUserNamePassword(List args) { super.setUserNamePassword(args); @@ -298,6 +319,26 @@ protected void setReadOnly(boolean readOnly) { super.setReadOnly(readOnly); } + public String getNodeId() { + return nodeId; + } + + public void setNodeId(String nodeId) { + this.nodeId = nodeId; + } + + } + + private boolean expireStaleConnections() { + + ClusterClientOptions options = getClusterClientOptions(); + return options == null || options.isCloseStaleConnections(); + } + + private ClusterClientOptions getClusterClientOptions() { + + ClientOptions options = getOptions(); + return options instanceof ClusterClientOptions ? (ClusterClientOptions) options : null; } } diff --git a/src/main/java/io/lettuce/core/cluster/StatefulRedisClusterPubSubConnectionImpl.java b/src/main/java/io/lettuce/core/cluster/StatefulRedisClusterPubSubConnectionImpl.java index 6f3387e98b..2886a61e65 100644 --- a/src/main/java/io/lettuce/core/cluster/StatefulRedisClusterPubSubConnectionImpl.java +++ b/src/main/java/io/lettuce/core/cluster/StatefulRedisClusterPubSubConnectionImpl.java @@ -33,6 +33,7 @@ import io.lettuce.core.cluster.pubsub.api.sync.PubSubNodeSelection; import io.lettuce.core.cluster.pubsub.api.sync.RedisClusterPubSubCommands; import io.lettuce.core.codec.RedisCodec; +import io.lettuce.core.internal.LettuceAssert; import io.lettuce.core.pubsub.RedisPubSubAsyncCommandsImpl; import io.lettuce.core.pubsub.RedisPubSubReactiveCommandsImpl; import io.lettuce.core.pubsub.StatefulRedisPubSubConnection; @@ -54,6 +55,8 @@ class StatefulRedisClusterPubSubConnectionImpl extends StatefulRedisPubSub private volatile CommandSet commandSet; + private volatile String nodeId; + /** * Initialize a new connection. * @@ -170,11 +173,38 @@ public CompletableFuture> getConnectionAsync return (CompletableFuture) provider.getConnectionAsync(ClusterConnectionProvider.Intent.WRITE, host, port); } + @Override + public void activated() { + super.activated(); + + async.clusterMyId().thenAccept(this::setNodeId); + } + public void setPartitions(Partitions partitions) { + + LettuceAssert.notNull(partitions, "Partitions must not be null"); + this.partitions = partitions; + + String nodeId = getNodeId(); + if (nodeId != null && expireStaleConnections()) { + + if (partitions.getPartitionByNodeId(nodeId) == null) { + endpoint.disconnect(); + } + } + getClusterDistributionChannelWriter().setPartitions(partitions); } + private String getNodeId() { + return this.nodeId; + } + + private void setNodeId(String nodeId) { + this.nodeId = nodeId; + } + public Partitions getPartitions() { return partitions; } @@ -228,4 +258,16 @@ private RedisURI lookup(String nodeId) { return null; } + private boolean expireStaleConnections() { + + ClusterClientOptions options = getClusterClientOptions(); + return options == null || options.isCloseStaleConnections(); + } + + private ClusterClientOptions getClusterClientOptions() { + + ClientOptions options = getOptions(); + return options instanceof ClusterClientOptions ? (ClusterClientOptions) options : null; + } + } diff --git a/src/main/java/io/lettuce/core/protocol/DefaultEndpoint.java b/src/main/java/io/lettuce/core/protocol/DefaultEndpoint.java index 950a49e6b7..a5e8b88398 100644 --- a/src/main/java/io/lettuce/core/protocol/DefaultEndpoint.java +++ b/src/main/java/io/lettuce/core/protocol/DefaultEndpoint.java @@ -601,7 +601,18 @@ public CompletableFuture closeAsync() { } return closeFuture; + } + + /** + * Disconnect the channel. + */ + public void disconnect() { + Channel channel = this.channel; + + if (channel != null && channel.isOpen()) { + channel.disconnect(); + } } private Channel getOpenChannel() { diff --git a/src/test/java/io/lettuce/core/cluster/RedisClusterSetupTest.java b/src/test/java/io/lettuce/core/cluster/RedisClusterSetupTest.java index 459251e6cc..3cc9050335 100644 --- a/src/test/java/io/lettuce/core/cluster/RedisClusterSetupTest.java +++ b/src/test/java/io/lettuce/core/cluster/RedisClusterSetupTest.java @@ -16,6 +16,7 @@ package io.lettuce.core.cluster; import static io.lettuce.core.cluster.ClusterTestSettings.createSlots; +import static io.lettuce.core.cluster.ClusterTestSettings.port5; import static io.lettuce.core.cluster.ClusterTestUtil.getNodeId; import static io.lettuce.core.cluster.ClusterTestUtil.getOwnPartition; import static org.assertj.core.api.Assertions.assertThat; @@ -346,7 +347,73 @@ public void atLeastOnceForgetNodeFailover() throws Exception { } @Test - public void expireStaleNodeIdConnections() throws Exception { + public void expireStaleDefaultConnection() { + + ClusterSetup.setup2Masters(clusterRule); + + RedisClusterClient redisClusterClient = RedisClusterClient.create(TestClientResources.get(), + RedisURI.Builder.redis(host, port5).build()); + redisClusterClient.setOptions(ClusterClientOptions.builder() + .topologyRefreshOptions(ClusterTopologyRefreshOptions.builder().dynamicRefreshSources(false).build()).build()); + + StatefulRedisClusterConnectionImpl connection = (StatefulRedisClusterConnectionImpl) redisClusterClient + .connect(); + String firstMaster = connection.sync().clusterMyId(); + + RedisClusterNode firstMasterNode = connection.getPartitions().getPartitionByNodeId(firstMaster); + + assertThat(firstMasterNode.getUri().getPort()).isEqualTo(port5); + + redis2.clusterForget(redis1.clusterMyId()); + redis1.clusterForget(redis2.clusterMyId()); + + Partitions partitions = ClusterPartitionParser.parse(redis2.clusterNodes()); + connection.setPartitions(partitions); + + Wait.untilTrue(connection::isOpen).waitOrTimeout(); + + String secondMaster = connection.sync().clusterMyId(); + assertThat(secondMaster).isEqualTo(redis2.clusterMyId()); + connection.close(); + + FastShutdown.shutdown(redisClusterClient); + } + + @Test + public void expireStaleDefaultPubSubConnection() { + + ClusterSetup.setup2Masters(clusterRule); + + RedisClusterClient redisClusterClient = RedisClusterClient.create(TestClientResources.get(), + RedisURI.Builder.redis(host, port5).build()); + redisClusterClient.setOptions(ClusterClientOptions.builder() + .topologyRefreshOptions(ClusterTopologyRefreshOptions.builder().dynamicRefreshSources(false).build()).build()); + + StatefulRedisClusterPubSubConnectionImpl connection = (StatefulRedisClusterPubSubConnectionImpl) redisClusterClient + .connectPubSub(); + String firstMaster = connection.sync().clusterMyId(); + + RedisClusterNode firstMasterNode = connection.getPartitions().getPartitionByNodeId(firstMaster); + + assertThat(firstMasterNode.getUri().getPort()).isEqualTo(port5); + + redis2.clusterForget(redis1.clusterMyId()); + redis1.clusterForget(redis2.clusterMyId()); + + Partitions partitions = ClusterPartitionParser.parse(redis2.clusterNodes()); + connection.setPartitions(partitions); + + Wait.untilTrue(connection::isOpen).waitOrTimeout(); + + String secondMaster = connection.sync().clusterMyId(); + assertThat(secondMaster).isEqualTo(redis2.clusterMyId()); + connection.close(); + + FastShutdown.shutdown(redisClusterClient); + } + + @Test + public void expireStaleNodeIdConnections() { clusterClient.setOptions(ClusterClientOptions.builder().topologyRefreshOptions(PERIODIC_REFRESH_ENABLED).build()); RedisAdvancedClusterAsyncCommands clusterConnection = clusterClient.connect().async(); @@ -379,7 +446,6 @@ public void expireStaleNodeIdConnections() throws Exception { Wait.untilEquals(1, () -> clusterConnectionProvider.getConnectionCount()).waitOrTimeout(); clusterConnection.getStatefulConnection().close(); - } private void assertRoutedExecution(RedisClusterAsyncCommands clusterConnection) { diff --git a/src/test/java/io/lettuce/core/cluster/RoundRobinSocketAddressSupplierUnitTests.java b/src/test/java/io/lettuce/core/cluster/RoundRobinSocketAddressSupplierUnitTests.java index a64fe3599c..f409e7207f 100644 --- a/src/test/java/io/lettuce/core/cluster/RoundRobinSocketAddressSupplierUnitTests.java +++ b/src/test/java/io/lettuce/core/cluster/RoundRobinSocketAddressSupplierUnitTests.java @@ -74,7 +74,7 @@ void before() { @Test void noOffset() { - RoundRobinSocketAddressSupplier sut = new RoundRobinSocketAddressSupplier(partitions, + RoundRobinSocketAddressSupplier sut = new RoundRobinSocketAddressSupplier(() -> partitions, redisClusterNodes -> redisClusterNodes, clientResourcesMock); assertThat(sut.get()).isEqualTo(addr1); @@ -88,7 +88,7 @@ void noOffset() { @Test void partitionTableChangesNewNode() { - RoundRobinSocketAddressSupplier sut = new RoundRobinSocketAddressSupplier(partitions, + RoundRobinSocketAddressSupplier sut = new RoundRobinSocketAddressSupplier(() -> partitions, redisClusterNodes -> redisClusterNodes, clientResourcesMock); assertThat(sut.get()).isEqualTo(addr1); @@ -106,7 +106,7 @@ void partitionTableChangesNewNode() { @Test void partitionTableChangesNodeRemoved() { - RoundRobinSocketAddressSupplier sut = new RoundRobinSocketAddressSupplier(partitions, + RoundRobinSocketAddressSupplier sut = new RoundRobinSocketAddressSupplier(() -> partitions, redisClusterNodes -> redisClusterNodes, clientResourcesMock); assertThat(sut.get()).isEqualTo(addr1);