From d0c6ee3a282274ea8bfcff946fb6404f4b339e91 Mon Sep 17 00:00:00 2001 From: Mark Paluch Date: Sun, 26 Jul 2015 12:56:57 +0200 Subject: [PATCH] NodeId-bound cluster connections #104 Motivation: Redis cluster nodes can be identified by three-and-a-half means: * NodeId * Host and Port * Slot ** The half: Master/slave state for a certain slot The identification details can be moved/changed at runtime. Most prominent examples are slots and master/slave state. A certain nodeId can be moved as well from one host/port to another one. The previous implementation did not care too much about that fact; all connections were identified by host and port. While moving a certain nodeId from one host to another is quite unlikely, it still might happen. The connection pool, therefore, distinguishes now between host and port-bound and nodeId-bound connections. Host and port-bound connections stick to the particular host/port. NodeId-bound connections are reconfigured once the cluster topology changes. Another effect of the change is, the connection management can double the number connections because connections are not shared amongst the identifier classes. --- .../ClusterDistributionChannelWriter.java | 10 +- .../cluster/ClusterNodeCommandHandler.java | 106 ++++++ .../PooledClusterConnectionProvider.java | 351 ++++++++++++------ .../RedisAdvancedClusterAsyncConnection.java | 21 +- .../RedisAdvancedClusterConnection.java | 17 +- .../redis/cluster/RedisClusterClient.java | 44 ++- .../api/StatefulRedisClusterConnection.java | 24 +- .../redis/protocol/CommandHandler.java | 12 +- 8 files changed, 425 insertions(+), 160 deletions(-) create mode 100644 src/main/java/com/lambdaworks/redis/cluster/ClusterNodeCommandHandler.java diff --git a/src/main/java/com/lambdaworks/redis/cluster/ClusterDistributionChannelWriter.java b/src/main/java/com/lambdaworks/redis/cluster/ClusterDistributionChannelWriter.java index 2b7f8d4cb6..acde8560a2 100644 --- a/src/main/java/com/lambdaworks/redis/cluster/ClusterDistributionChannelWriter.java +++ b/src/main/java/com/lambdaworks/redis/cluster/ClusterDistributionChannelWriter.java @@ -12,6 +12,8 @@ import com.lambdaworks.redis.RedisException; import com.lambdaworks.redis.api.StatefulRedisConnection; import com.lambdaworks.redis.cluster.models.partitions.Partitions; +import com.lambdaworks.redis.cluster.models.partitions.Partitions; +import com.lambdaworks.redis.protocol.Command; import com.lambdaworks.redis.protocol.CommandArgs; import com.lambdaworks.redis.protocol.CommandKeyword; import com.lambdaworks.redis.protocol.RedisCommand; @@ -31,10 +33,8 @@ class ClusterDistributionChannelWriter implements RedisChannelWriter private boolean closed = false; private int executionLimit = 5; - public ClusterDistributionChannelWriter(RedisChannelWriter defaultWriter, - ClusterConnectionProvider clusterConnectionProvider) { + public ClusterDistributionChannelWriter(RedisChannelWriter defaultWriter) { this.defaultWriter = defaultWriter; - this.clusterConnectionProvider = clusterConnectionProvider; } @Override @@ -174,6 +174,10 @@ public void reset() { clusterConnectionProvider.reset(); } + public void setClusterConnectionProvider(ClusterConnectionProvider clusterConnectionProvider) { + this.clusterConnectionProvider = clusterConnectionProvider; + } + public void setPartitions(Partitions partitions) { clusterConnectionProvider.setPartitions(partitions); } diff --git a/src/main/java/com/lambdaworks/redis/cluster/ClusterNodeCommandHandler.java b/src/main/java/com/lambdaworks/redis/cluster/ClusterNodeCommandHandler.java new file mode 100644 index 0000000000..b4893f7857 --- /dev/null +++ b/src/main/java/com/lambdaworks/redis/cluster/ClusterNodeCommandHandler.java @@ -0,0 +1,106 @@ +package com.lambdaworks.redis.cluster; + +import java.util.Queue; +import java.util.Set; + +import com.google.common.collect.ImmutableSet; +import com.lambdaworks.redis.ClientOptions; +import com.lambdaworks.redis.RedisChannelWriter; +import com.lambdaworks.redis.RedisException; +import com.lambdaworks.redis.protocol.CommandHandler; +import com.lambdaworks.redis.protocol.ConnectionWatchdog; +import com.lambdaworks.redis.protocol.RedisCommand; + +import io.netty.channel.ChannelHandler; +import io.netty.util.internal.logging.InternalLogger; +import io.netty.util.internal.logging.InternalLoggerFactory; + +/** + * @author Mark Paluch + */ +@ChannelHandler.Sharable +class ClusterNodeCommandHandler extends CommandHandler { + + private static final InternalLogger logger = InternalLoggerFactory.getInstance(ClusterNodeCommandHandler.class); + private static final Set CHANNEL_OPEN_STATES = ImmutableSet.of(LifecycleState.ACTIVATING, + LifecycleState.ACTIVE, LifecycleState.CONNECTED); + + private final RedisChannelWriter clusterChannelWriter; + + /** + * Initialize a new instance that handles commands from the supplied queue. + * + * @param clientOptions client options for this connection + * @param queue The command queue + * @param clusterChannelWriter top-most channel writer. + */ + public ClusterNodeCommandHandler(ClientOptions clientOptions, Queue> queue, + RedisChannelWriter clusterChannelWriter) { + super(clientOptions, queue); + this.clusterChannelWriter = clusterChannelWriter; + } + + /** + * Prepare the closing of the channel. + */ + public void prepareClose() { + if (channel != null) { + ConnectionWatchdog connectionWatchdog = channel.pipeline().get(ConnectionWatchdog.class); + if (connectionWatchdog != null) { + connectionWatchdog.setReconnectSuspended(true); + } + } + } + + /** + * Move queued and buffered commands from the inactive connection to the master command writer. This is done only if the + * current connection is disconnected and auto-reconnect is enabled (command-retries). If the connection would be open, we + * could get into a race that the commands we're moving are right now in processing. Alive connections can handle redirects + * and retries on their own. + */ + @Override + public void close() { + + logger.debug("{} close()", logPrefix()); + + if (clusterChannelWriter != null) { + if (isAutoReconnect() && !CHANNEL_OPEN_STATES.contains(getState())) { + for (RedisCommand queuedCommand : queue) { + try { + clusterChannelWriter.write(queuedCommand); + } catch (RedisException e) { + queuedCommand.completeExceptionally(e); + queuedCommand.complete(); + } + } + + queue.clear(); + } + + for (RedisCommand queuedCommand : commandBuffer) { + try { + clusterChannelWriter.write(queuedCommand); + } catch (RedisException e) { + queuedCommand.completeExceptionally(e); + } + } + + commandBuffer.clear(); + } + + super.close(); + } + + public boolean isAutoReconnect() { + return clientOptions.isAutoReconnect(); + } + + public boolean isQueueEmpty() { + if (queue.isEmpty() && commandBuffer.isEmpty()) { + return true; + } + + return false; + } + +} diff --git a/src/main/java/com/lambdaworks/redis/cluster/PooledClusterConnectionProvider.java b/src/main/java/com/lambdaworks/redis/cluster/PooledClusterConnectionProvider.java index 493514b2fd..f200e30a22 100644 --- a/src/main/java/com/lambdaworks/redis/cluster/PooledClusterConnectionProvider.java +++ b/src/main/java/com/lambdaworks/redis/cluster/PooledClusterConnectionProvider.java @@ -6,6 +6,7 @@ import java.util.Map; import java.util.Set; +import com.google.common.base.Supplier; import com.google.common.cache.CacheBuilder; import com.google.common.cache.CacheLoader; import com.google.common.cache.LoadingCache; @@ -14,7 +15,8 @@ import com.google.common.collect.Sets; import com.google.common.net.HostAndPort; import com.google.common.util.concurrent.UncheckedExecutionException; -import com.lambdaworks.redis.LettuceStrings; +import com.lambdaworks.redis.RedisChannelHandler; +import com.lambdaworks.redis.RedisChannelWriter; import com.lambdaworks.redis.RedisException; import com.lambdaworks.redis.RedisURI; import com.lambdaworks.redis.api.StatefulRedisConnection; @@ -36,7 +38,8 @@ class PooledClusterConnectionProvider implements ClusterConnectionProvider { private static final InternalLogger logger = InternalLoggerFactory.getInstance(PooledClusterConnectionProvider.class); - private final LoadingCache> connections; + // Contains NodeId-identified and HostAndPort-identified connections. + private final LoadingCache> connections; private final boolean debugEnabled; private final StatefulRedisConnection writers[] = new StatefulRedisConnection[SlotHash.SLOT_COUNT]; private Partitions partitions; @@ -44,31 +47,11 @@ class PooledClusterConnectionProvider implements ClusterConnectionProvider private boolean autoFlushCommands = true; private Object stateLock = new Object(); - public PooledClusterConnectionProvider(final RedisClusterClient redisClusterClient, final RedisCodec redisCodec) { + public PooledClusterConnectionProvider(RedisClusterClient redisClusterClient, RedisChannelWriter clusterWriter, + RedisCodec redisCodec) { this.debugEnabled = logger.isDebugEnabled(); - this.connections = CacheBuilder.newBuilder().build(new CacheLoader>() { - @Override - public StatefulRedisConnection load(PoolKey key) throws Exception { - - Set redisUris = getConnectionPointsOfAllNodes(); - HostAndPort hostAndPort = HostAndPort.fromParts(key.host, key.port); - - if (!redisUris.contains(hostAndPort)) { - throw new IllegalArgumentException("Connection to " + hostAndPort - + " not allowed. This connection point is not known in the cluster view"); - } - - StatefulRedisConnection connection = redisClusterClient.connectToNode(redisCodec, key.getSocketAddress()); - if (key.getIntent() == Intent.READ) { - connection.sync().readOnly(); - } - - synchronized (stateLock) { - connection.setAutoFlushCommands(autoFlushCommands); - } - return connection; - } - }); + this.connections = CacheBuilder.newBuilder().build( + new ConnectionFactory(redisClusterClient, redisCodec, clusterWriter)); } @Override @@ -78,7 +61,13 @@ public StatefulRedisConnection getConnection(Intent intent, int slot) { logger.debug("getConnection(" + intent + ", " + slot + ")"); } - StatefulRedisConnection writer = writers[slot]; + StatefulRedisConnection writer; + + // avoid races when reconfiguring partitions. + synchronized (stateLock) { + writer = writers[slot]; + } + if (writer == null) { RedisClusterNode partition = partitions.getPartitionBySlot(slot); if (partition == null) { @@ -86,8 +75,8 @@ public StatefulRedisConnection getConnection(Intent intent, int slot) { } try { - PoolKey key = new PoolKey(intent, partition.getUri()); - return writers[slot] = getConnection(key); + ConnectionKey key = new ConnectionKey(intent, partition.getNodeId()); + return writers[slot] = connections.get(key); } catch (UncheckedExecutionException e) { throw new RedisException(e.getCause()); } catch (Exception e) { @@ -97,10 +86,6 @@ public StatefulRedisConnection getConnection(Intent intent, int slot) { return writer; } - private StatefulRedisConnection getConnection(PoolKey key) throws java.util.concurrent.ExecutionException { - return connections.get(key); - } - @Override @SuppressWarnings({ "unchecked", "hiding", "rawtypes" }) public StatefulRedisConnection getConnection(Intent intent, String host, int port) { @@ -108,8 +93,16 @@ public StatefulRedisConnection getConnection(Intent intent, String host, i if (debugEnabled) { logger.debug("getConnection(" + intent + ", " + host + ", " + port + ")"); } - PoolKey key = new PoolKey(intent, host, port); - return getConnection(key); + + RedisClusterNode redisClusterNode = getPartition(host, port); + + if (redisClusterNode == null) { + HostAndPort hostAndPort = HostAndPort.fromParts(host, port); + throw invalidConnectionPoint(hostAndPort.toString()); + } + + ConnectionKey key = new ConnectionKey(intent, host, port); + return connections.get(key); } catch (UncheckedExecutionException e) { throw new RedisException(e.getCause()); } catch (Exception e) { @@ -117,11 +110,21 @@ public StatefulRedisConnection getConnection(Intent intent, String host, i } } + private RedisClusterNode getPartition(String host, int port) { + for (RedisClusterNode partition : partitions) { + RedisURI uri = partition.getUri(); + if (port == uri.getPort() && host.equals(uri.getHost())) { + return partition; + } + } + return null; + } + @Override public void close() { - ImmutableMap> copy = ImmutableMap.copyOf(this.connections.asMap()); + ImmutableMap> copy = ImmutableMap.copyOf(this.connections.asMap()); this.connections.invalidateAll(); - resetPartitions(); + resetWriterCache(); for (StatefulRedisConnection kvRedisAsyncConnection : copy.values()) { if (kvRedisAsyncConnection.isOpen()) { kvRedisAsyncConnection.close(); @@ -131,116 +134,98 @@ public void close() { @Override public void reset() { - ImmutableMap> copy = ImmutableMap.copyOf(this.connections.asMap()); + ImmutableMap> copy = ImmutableMap.copyOf(this.connections.asMap()); for (StatefulRedisConnection kvRedisAsyncConnection : copy.values()) { kvRedisAsyncConnection.reset(); } } - private static class PoolKey { - private final ClusterConnectionProvider.Intent intent; - private SocketAddress socketAddress; - private final String host; - private final int port; - - private PoolKey(ClusterConnectionProvider.Intent intent, RedisURI uri) { - this.intent = intent; - this.host = uri.getHost(); - this.port = uri.getPort(); - this.socketAddress = uri.getResolvedAddress(); + /** + * Synchronize on {@code stateLock} to initiate a happens-before relation and clear the thread caches of other threads. + * + * @param partitions the new partitions. + */ + @Override + public void setPartitions(Partitions partitions) { + synchronized (stateLock) { + this.partitions = partitions; + reconfigurePartitions(); } + } - private PoolKey(Intent intent, String host, int port) { - this.intent = intent; - this.host = host; - this.port = port; - } + private void reconfigurePartitions() { + Set staleConnections = getStaleConnectionKeys(); - public ClusterConnectionProvider.Intent getIntent() { - return intent; - } + for (ConnectionKey key : staleConnections) { + StatefulRedisConnection connection = connections.getIfPresent(key); - public SocketAddress getSocketAddress() { + RedisChannelHandler redisChannelHandler = (RedisChannelHandler) connection; - if (socketAddress == null && LettuceStrings.isNotEmpty(host)) { - socketAddress = new InetSocketAddress(host, port); + if (redisChannelHandler.getChannelWriter() instanceof ClusterNodeCommandHandler) { + ClusterNodeCommandHandler clusterNodeCommandHandler = (ClusterNodeCommandHandler) redisChannelHandler + .getChannelWriter(); + clusterNodeCommandHandler.prepareClose(); } - - return socketAddress; } - @Override - public boolean equals(Object o) { - if (this == o) { - return true; - } - if (!(o instanceof PoolKey)) { - return false; - } - - PoolKey poolKey = (PoolKey) o; - - if (port != poolKey.port) { - return false; - } - if (host != null ? !host.equals(poolKey.host) : poolKey.host != null) { - return false; - } - if (intent != poolKey.intent) { - return false; - } + resetWriterCache(); - return true; - } - - @Override - public int hashCode() { - int result = intent != null ? intent.hashCode() : 0; - result = 31 * result + (host != null ? host.hashCode() : 0); - result = 31 * result + port; - return result; + for (ConnectionKey key : staleConnections) { + StatefulRedisConnection connection = connections.getIfPresent(key); + connection.close(); + connections.invalidate(key); } } - @Override - public void setPartitions(Partitions partitions) { - this.partitions = partitions; - resetPartitions(); - } - + /** + * Close stale connections. + */ @Override public void closeStaleConnections() { logger.debug("closeStaleConnections() count before expiring: {}", getConnectionCount()); - Map> map = Maps.newHashMap(connections.asMap()); - Set redisUris = getConnectionPointsOfAllNodes(); - for (PoolKey poolKey : map.keySet()) { - if (redisUris.contains(HostAndPort.fromParts(poolKey.host, poolKey.port))) { - continue; - } + Set stale = getStaleConnectionKeys(); - connections.invalidate(poolKey); - map.get(poolKey).close(); + for (ConnectionKey connectionKey : stale) { + StatefulRedisConnection connection = connections.getIfPresent(connectionKey); + if (connection != null) { + connections.invalidate(connectionKey); + connection.close(); + } } logger.debug("closeStaleConnections() count after expiring: {}", getConnectionCount()); } - protected Set getConnectionPointsOfAllNodes() { - Set redisUris = Sets.newHashSet(); + /** + * Retrieve a set of PoolKey's for all pooled connections that are within the pool but not within the {@link Partitions}. + * + * @return Set of {@link ConnectionKey}s + */ + private Set getStaleConnectionKeys() { + Map> map = Maps.newHashMap(connections.asMap()); + Set stale = Sets.newHashSet(); - for (RedisClusterNode partition : partitions) { - if (partition.getFlags().contains(RedisClusterNode.NodeFlag.MASTER)) { - redisUris.add(HostAndPort.fromParts(partition.getUri().getHost(), partition.getUri().getPort())); + for (ConnectionKey connectionKey : map.keySet()) { + + if (connectionKey.nodeId != null && partitions.getPartitionByNodeId(connectionKey.nodeId) != null) { + continue; } - if (partition.getFlags().contains(RedisClusterNode.NodeFlag.SLAVE)) { - redisUris.add(HostAndPort.fromParts(partition.getUri().getHost(), partition.getUri().getPort())); + if (connectionKey.host != null && getPartition(connectionKey.host, connectionKey.port) != null) { + continue; } + stale.add(connectionKey); } - return redisUris; + return stale; } + /** + * Set auto-flush on all commands. Synchronize on {@code stateLock} to initiate a happens-before relation and clear the + * thread caches of other threads. + * + * @param autoFlush state of autoFlush. + */ @Override public void setAutoFlushCommands(boolean autoFlush) { synchronized (stateLock) { @@ -260,14 +245,150 @@ public void flushCommands() { } } + /** + * + * @return number of connections. + */ protected long getConnectionCount() { return connections.size(); } - protected void resetPartitions() { - + /** + * Reset the internal writer cache. This is necessary because the {@link Partitions} have no reference to the writer cache. + * + * Synchronize on {@code stateLock} to initiate a happens-before relation and clear the thread caches of other threads. + */ + protected void resetWriterCache() { synchronized (stateLock) { Arrays.fill(writers, null); } } + + private RuntimeException invalidConnectionPoint(String message) { + return new IllegalArgumentException("Connection to " + message + + " not allowed. This connection point is not known in the cluster view"); + } + + private Supplier getSocketAddressSupplier(final ConnectionKey connectionKey) { + return new Supplier() { + @Override + public SocketAddress get() { + + if (connectionKey.nodeId != null) { + return getSocketAddress(connectionKey.nodeId); + } + return new InetSocketAddress(connectionKey.host, connectionKey.port); + } + + }; + } + + protected SocketAddress getSocketAddress(String nodeId) { + for (RedisClusterNode partition : partitions) { + if (partition.getNodeId().equals(nodeId)) { + return partition.getUri().getResolvedAddress(); + } + } + return null; + } + + /** + * Connection to identify a connection either by nodeId or host/port. + */ + private static class ConnectionKey { + private final ClusterConnectionProvider.Intent intent; + private final String nodeId; + private final String host; + private final int port; + + public ConnectionKey(Intent intent, String nodeId) { + this.intent = intent; + this.nodeId = nodeId; + this.host = null; + this.port = 0; + } + + public ConnectionKey(Intent intent, String host, int port) { + this.intent = intent; + this.host = host; + this.port = port; + this.nodeId = null; + } + + @Override + public boolean equals(Object o) { + if (this == o) + return true; + if (!(o instanceof ConnectionKey)) + return false; + + ConnectionKey key = (ConnectionKey) o; + + if (port != key.port) + return false; + if (intent != key.intent) + return false; + if (nodeId != null ? !nodeId.equals(key.nodeId) : key.nodeId != null) + return false; + return !(host != null ? !host.equals(key.host) : key.host != null); + } + + @Override + public int hashCode() { + int result = intent != null ? intent.hashCode() : 0; + result = 31 * result + (nodeId != null ? nodeId.hashCode() : 0); + result = 31 * result + (host != null ? host.hashCode() : 0); + result = 31 * result + port; + return result; + } + } + + private class ConnectionFactory extends CacheLoader> { + + private final RedisClusterClient redisClusterClient; + private final RedisCodec redisCodec; + private final RedisChannelWriter clusterWriter; + + public ConnectionFactory(RedisClusterClient redisClusterClient, RedisCodec redisCodec, + RedisChannelWriter clusterWriter) { + this.redisClusterClient = redisClusterClient; + this.redisCodec = redisCodec; + this.clusterWriter = clusterWriter; + } + + @Override + public StatefulRedisConnection load(ConnectionKey key) throws Exception { + + StatefulRedisConnection connection = null; + if (key.nodeId != null) { + if (partitions.getPartitionByNodeId(key.nodeId) == null) { + throw invalidConnectionPoint("node id " + key.nodeId); + } + + // NodeId connections provide command recovery due to cluster reconfiguration + connection = redisClusterClient.connectToNode(redisCodec, key.nodeId, clusterWriter, + getSocketAddressSupplier(key)); + } + + if (key.host != null) { + if (getPartition(key.host, key.port) == null) { + throw invalidConnectionPoint(key.host + ":" + key.port); + } + + // Host and port connections do not provide command recovery due to cluster reconfiguration + connection = redisClusterClient.connectToNode(redisCodec, key.host + ":" + key.port, null, + getSocketAddressSupplier(key)); + } + + if (key.intent == Intent.READ) { + connection.sync().readOnly(); + } + + synchronized (stateLock) { + connection.setAutoFlushCommands(autoFlushCommands); + } + + return connection; + } + } } diff --git a/src/main/java/com/lambdaworks/redis/cluster/RedisAdvancedClusterAsyncConnection.java b/src/main/java/com/lambdaworks/redis/cluster/RedisAdvancedClusterAsyncConnection.java index f1dc5f9ad1..9b0979d280 100644 --- a/src/main/java/com/lambdaworks/redis/cluster/RedisAdvancedClusterAsyncConnection.java +++ b/src/main/java/com/lambdaworks/redis/cluster/RedisAdvancedClusterAsyncConnection.java @@ -15,13 +15,16 @@ public interface RedisAdvancedClusterAsyncConnection extends RedisClusterAsyncConnection { /** - * Retrieve a connection to the specified cluster node using the nodeId. Host and port are looked up in the node list. - * + * Retrieve a connection to the specified cluster node using the nodeId. Host and port are looked up in the node list. This + * connection is bound to the node id. Once the cluster topology view is updated, the connection will try to reconnect the + * to the node with the specified {@code nodeId}, that behavior can also lead to a closed connection once the node with the + * specified {@code nodeId} is no longer part of the cluster. + * * Do not close the connections. Otherwise, unpredictable behavior will occur. The nodeId must be part of the cluster and is * validated against the current topology view in {@link com.lambdaworks.redis.cluster.models.partitions.Partitions}. - * + * * In contrast to the {@link RedisAdvancedClusterAsyncConnection}, node-connections do not route commands to other cluster - * nodes + * nodes. * * @param nodeId the node Id * @return a connection to the requested cluster node @@ -30,10 +33,14 @@ public interface RedisAdvancedClusterAsyncConnection extends RedisClusterA RedisClusterAsyncConnection getConnection(String nodeId); /** - * Retrieve a connection to the specified cluster node using the nodeId. Do not close the connections. Otherwise, - * unpredictable behavior will occur. The node must be part of the cluster and host/port are validated (exact check) against - * the current topology view in {@link com.lambdaworks.redis.cluster.models.partitions.Partitions}. + * Retrieve a connection to the specified cluster node using the nodeId. This connection is bound to a host and port. + * Updates to the cluster topology view can close the connection once the host, identified by {@code host} and {@code port}, + * are no longer part of the cluster. * + * Do not close the connections. Otherwise, unpredictable behavior will occur. The node must be part of the cluster and + * host/port are validated (exact check) against the current topology view in + * {@link com.lambdaworks.redis.cluster.models.partitions.Partitions}. + * * In contrast to the {@link RedisAdvancedClusterAsyncConnection}, node-connections do not route commands to other cluster * nodes. * diff --git a/src/main/java/com/lambdaworks/redis/cluster/RedisAdvancedClusterConnection.java b/src/main/java/com/lambdaworks/redis/cluster/RedisAdvancedClusterConnection.java index 58ff6adad8..1edc10ec56 100644 --- a/src/main/java/com/lambdaworks/redis/cluster/RedisAdvancedClusterConnection.java +++ b/src/main/java/com/lambdaworks/redis/cluster/RedisAdvancedClusterConnection.java @@ -15,13 +15,16 @@ public interface RedisAdvancedClusterConnection extends RedisClusterConnection { /** - * Retrieve a connection to the specified cluster node using the nodeId. Host and port are looked up in the node list. + * Retrieve a connection to the specified cluster node using the nodeId. Host and port are looked up in the node list. This + * connection is bound to the node id. Once the cluster topology view is updated, the connection will try to reconnect the + * to the node with the specified {@code nodeId}, that behavior can also lead to a closed connection once the node with the + * specified {@code nodeId} is no longer part of the cluster. * * Do not close the connections. Otherwise, unpredictable behavior will occur. The nodeId must be part of the cluster and is * validated against the current topology view in {@link com.lambdaworks.redis.cluster.models.partitions.Partitions}. * * In contrast to the {@link RedisAdvancedClusterConnection}, node-connections do not route commands to other cluster nodes. - * + * * @param nodeId the node Id * @return a connection to the requested cluster node * @throws RedisException if the requested node identified by {@code nodeId} is not part of the cluster @@ -29,9 +32,13 @@ public interface RedisAdvancedClusterConnection extends RedisClusterConnec RedisClusterConnection getConnection(String nodeId); /** - * Retrieve a connection to the specified cluster node using the nodeId. Do not close the connections. Otherwise, - * unpredictable behavior will occur. The node must be part of the cluster and host/port are validated (exact check) against - * the current topology view in {@link com.lambdaworks.redis.cluster.models.partitions.Partitions}. + * Retrieve a connection to the specified cluster node using the nodeId. This connection is bound to a host and port. + * Updates to the cluster topology view can close the connection once the host, identified by {@code host} and {@code port}, + * are no longer part of the cluster. + * + * Do not close the connections. Otherwise, unpredictable behavior will occur. The node must be part of the cluster and + * host/port are validated (exact check) against the current topology view in + * {@link com.lambdaworks.redis.cluster.models.partitions.Partitions}. * * In contrast to the {@link RedisAdvancedClusterConnection}, node-connections do not route commands to other cluster nodes. * diff --git a/src/main/java/com/lambdaworks/redis/cluster/RedisClusterClient.java b/src/main/java/com/lambdaworks/redis/cluster/RedisClusterClient.java index c91f9ef9bf..0cf9cacac6 100644 --- a/src/main/java/com/lambdaworks/redis/cluster/RedisClusterClient.java +++ b/src/main/java/com/lambdaworks/redis/cluster/RedisClusterClient.java @@ -11,7 +11,6 @@ import java.util.List; import java.util.Map; import java.util.Queue; -import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Consumer; import java.util.function.Predicate; @@ -200,28 +199,37 @@ public RedisAdvancedClusterAsyncCommands connectClusterAsync(RedisC return connectClusterImpl(codec, getSocketAddressSupplier()).async(); } - protected StatefulRedisConnection connectToNode(SocketAddress socketAddress) { - return connectToNode(newStringStringCodec(), socketAddress); + protected StatefulRedisConnection connectToNode(final SocketAddress socketAddress) { + return connectToNode(newStringStringCodec(), socketAddress.toString(), null, new Supplier() { + @Override + public SocketAddress get() { + return socketAddress; + } + }); } /** * Create a connection to a redis socket address. + * + * @param codec Use this codec to encode/decode keys and values. + * @param nodeId the nodeId + * @param clusterWriter global cluster writer + * @param socketAddressSupplier supplier for the socket address * - * @param socketAddress initial connect * @param Key type. * @param Value type. * @return a new connection */ - StatefulRedisConnection connectToNode(RedisCodec codec, final SocketAddress socketAddress) { - - logger.debug("connectAsyncImpl(" + socketAddress + ")"); - Queue> queue = new ArrayDeque>(); + StatefulRedisConnection connectToNode(RedisCodec codec, String nodeId, + RedisChannelWriter clusterWriter, final Supplier socketAddressSupplier) { - CommandHandler handler = new CommandHandler(clientOptions, queue); + logger.debug("connectNode(" + nodeId + ")"); + Queue> queue = new ArrayDeque<>(); + ClusterNodeCommandHandler handler = new ClusterNodeCommandHandler(clientOptions, queue, clusterWriter); StatefulRedisConnectionImpl connection = new StatefulRedisConnectionImpl(handler, codec, timeout, unit); - connectAsyncImpl(handler, connection, () -> socketAddress); + connectAsyncImpl(handler, connection, socketAddressSupplier); connection.registerCloseables(closeableResources, connection); @@ -254,12 +262,13 @@ StatefulRedisClusterConnectionImpl connectClusterImpl(RedisCodec handler = new CommandHandler(clientOptions, queue); - final PooledClusterConnectionProvider pooledClusterConnectionProvider = new PooledClusterConnectionProvider( - this, codec); + ClusterDistributionChannelWriter clusterWriter = new ClusterDistributionChannelWriter(handler); + PooledClusterConnectionProvider pooledClusterConnectionProvider = new PooledClusterConnectionProvider(this, + clusterWriter, codec); + + clusterWriter.setClusterConnectionProvider(pooledClusterConnectionProvider); - final ClusterDistributionChannelWriter clusterWriter = new ClusterDistributionChannelWriter(handler, - pooledClusterConnectionProvider); - StatefulRedisClusterConnectionImpl connection = new StatefulRedisClusterConnectionImpl<>(clusterWriter, codec, + StatefulRedisClusterConnectionImpl connection = new StatefulRedisClusterConnectionImpl(clusterWriter, codec, timeout, unit); connection.setPartitions(partitions); @@ -392,6 +401,11 @@ protected Utf8StringCodec newStringStringCodec() { return new Utf8StringCodec(); } + /** + * Sets the new cluster topology. The partitions are not applied to existing connections. + * + * @param partitions partitions object + */ public void setPartitions(Partitions partitions) { this.partitions = partitions; } diff --git a/src/main/java/com/lambdaworks/redis/cluster/api/StatefulRedisClusterConnection.java b/src/main/java/com/lambdaworks/redis/cluster/api/StatefulRedisClusterConnection.java index 08b19751cf..028965f2b1 100644 --- a/src/main/java/com/lambdaworks/redis/cluster/api/StatefulRedisClusterConnection.java +++ b/src/main/java/com/lambdaworks/redis/cluster/api/StatefulRedisClusterConnection.java @@ -3,8 +3,6 @@ import com.lambdaworks.redis.RedisException; import com.lambdaworks.redis.api.StatefulConnection; import com.lambdaworks.redis.api.StatefulRedisConnection; -import com.lambdaworks.redis.cluster.RedisAdvancedClusterAsyncConnection; -import com.lambdaworks.redis.cluster.RedisAdvancedClusterConnection; import com.lambdaworks.redis.cluster.api.async.RedisAdvancedClusterAsyncCommands; import com.lambdaworks.redis.cluster.api.rx.RedisAdvancedClusterReactiveCommands; import com.lambdaworks.redis.cluster.api.sync.RedisAdvancedClusterCommands; @@ -44,13 +42,16 @@ public interface StatefulRedisClusterConnection extends StatefulConnection RedisAdvancedClusterReactiveCommands reactive(); /** - * Retrieve a connection to the specified cluster node using the nodeId. Host and port are looked up in the node list. + * Retrieve a connection to the specified cluster node using the nodeId. Host and port are looked up in the node list. This + * connection is bound to the node id. Once the cluster topology view is updated, the connection will try to reconnect the + * to the node with the specified {@code nodeId}, that behavior can also lead to a closed connection once the node with the + * specified {@code nodeId} is no longer part of the cluster. * * Do not close the connections. Otherwise, unpredictable behavior will occur. The nodeId must be part of the cluster and is * validated against the current topology view in {@link com.lambdaworks.redis.cluster.models.partitions.Partitions}. + * * - * In contrast to the {@link StatefulRedisClusterConnection}, node-connections do not route commands to other cluster - * nodes. + * In contrast to the {@link StatefulRedisClusterConnection}, node-connections do not route commands to other cluster nodes. * * @param nodeId the node Id * @return a connection to the requested cluster node @@ -59,12 +60,15 @@ public interface StatefulRedisClusterConnection extends StatefulConnection StatefulRedisConnection getConnection(String nodeId); /** - * Retrieve a connection to the specified cluster node using the nodeId. Do not close the connections. Otherwise, - * unpredictable behavior will occur. The node must be part of the cluster and host/port are validated (exact check) against - * the current topology view in {@link com.lambdaworks.redis.cluster.models.partitions.Partitions}. + * Retrieve a connection to the specified cluster node using the nodeId. This connection is bound to a host and port. + * Updates to the cluster topology view can close the connection once the host, identified by {@code host} and {@code port}, + * are no longer part of the cluster. * - * In contrast to the {@link StatefulRedisClusterConnection}, node-connections do not route commands to other cluster - * nodes. + * Do not close the connections. Otherwise, unpredictable behavior will occur. The node must be part of the cluster and + * host/port are validated (exact check) against the current topology view in + * {@link com.lambdaworks.redis.cluster.models.partitions.Partitions}. + * + * In contrast to the {@link StatefulRedisClusterConnection}, node-connections do not route commands to other cluster nodes. * * @param host the host * @param port the port diff --git a/src/main/java/com/lambdaworks/redis/protocol/CommandHandler.java b/src/main/java/com/lambdaworks/redis/protocol/CommandHandler.java index a88af243e0..065ccd0753 100644 --- a/src/main/java/com/lambdaworks/redis/protocol/CommandHandler.java +++ b/src/main/java/com/lambdaworks/redis/protocol/CommandHandler.java @@ -48,10 +48,10 @@ public class CommandHandler extends ChannelDuplexHandler implements RedisC protected Queue> commandBuffer = new ArrayDeque>(); protected ByteBuf buffer; protected RedisStateMachine rsm; + protected Channel channel; private LifecycleState lifecycleState = LifecycleState.NOT_CONNECTED; private Object stateLock = new Object(); - private Channel channel; /** * If TRACE level logging has been enabled at startup. @@ -389,6 +389,10 @@ protected void setState(LifecycleState lifecycleState) { } } + protected LifecycleState getState() { + return lifecycleState; + } + private void cancelCommands(String message) { int size = 0; if (queue != null) { @@ -511,7 +515,7 @@ public void setAutoFlushCommands(boolean autoFlush) { } } - private String logPrefix() { + protected String logPrefix() { if (logPrefix != null) { return logPrefix; } @@ -520,9 +524,7 @@ private String logPrefix() { return logPrefix = buffer.toString(); } - @VisibleForTesting - enum LifecycleState { - + public enum LifecycleState { NOT_CONNECTED, REGISTERED, CONNECTED, ACTIVATING, ACTIVE, DISCONNECTED, DEACTIVATING, DEACTIVATED, CLOSED, }