From 3d2fc861776583e4b003db960fb08f203a85b08c Mon Sep 17 00:00:00 2001 From: Mark Paluch Date: Sun, 26 Jul 2015 12:56:57 +0200 Subject: [PATCH] NodeId-bound cluster connections #104 Motivation: Redis cluster nodes can be identified by three-and-a-half means: * NodeId * Host and Port * Slot ** The half: Master/slave state for a certain slot The identification details can be moved/changed at runtime. Most prominent examples are slots and master/slave state. A certain nodeId can be moved as well from one host/port to another one. The previous implementation did not care too much about that fact; all connections were identified by host and port. While moving a certain nodeId from one host to another is quite unlikely, it still might happen. The connection pool, therefore, distinguishes now between host and port-bound and nodeId-bound connections. Host and port-bound connections stick to the particular host/port. NodeId-bound connections are reconfigured once the cluster topology changes. Another effect of the change is, the connection management can double the number connections because connections are not shared amongst the identifier classes. --- .../ClusterDistributionChannelWriter.java | 9 +- .../cluster/ClusterNodeCommandHandler.java | 107 ++++++ .../PooledClusterConnectionProvider.java | 344 ++++++++++++------ .../RedisAdvancedClusterAsyncConnection.java | 15 +- .../RedisAdvancedClusterConnection.java | 21 +- .../redis/cluster/RedisClusterClient.java | 44 ++- .../redis/protocol/CommandHandler.java | 14 +- 7 files changed, 402 insertions(+), 152 deletions(-) create mode 100644 src/main/java/com/lambdaworks/redis/cluster/ClusterNodeCommandHandler.java diff --git a/src/main/java/com/lambdaworks/redis/cluster/ClusterDistributionChannelWriter.java b/src/main/java/com/lambdaworks/redis/cluster/ClusterDistributionChannelWriter.java index ee9d832911..987981ed3a 100644 --- a/src/main/java/com/lambdaworks/redis/cluster/ClusterDistributionChannelWriter.java +++ b/src/main/java/com/lambdaworks/redis/cluster/ClusterDistributionChannelWriter.java @@ -10,6 +10,7 @@ import com.lambdaworks.redis.RedisAsyncConnectionImpl; import com.lambdaworks.redis.RedisChannelHandler; import com.lambdaworks.redis.RedisChannelWriter; +import com.lambdaworks.redis.cluster.models.partitions.Partitions; import com.lambdaworks.redis.protocol.Command; import com.lambdaworks.redis.protocol.CommandArgs; import com.lambdaworks.redis.protocol.CommandKeyword; @@ -30,10 +31,8 @@ class ClusterDistributionChannelWriter implements RedisChannelWriter private boolean closed = false; private int executionLimit = 5; - public ClusterDistributionChannelWriter(RedisChannelWriter defaultWriter, - ClusterConnectionProvider clusterConnectionProvider) { + public ClusterDistributionChannelWriter(RedisChannelWriter defaultWriter) { this.defaultWriter = defaultWriter; - this.clusterConnectionProvider = clusterConnectionProvider; } @Override @@ -166,4 +165,8 @@ public void reset() { defaultWriter.reset(); clusterConnectionProvider.reset(); } + + public void setClusterConnectionProvider(ClusterConnectionProvider clusterConnectionProvider) { + this.clusterConnectionProvider = clusterConnectionProvider; + } } diff --git a/src/main/java/com/lambdaworks/redis/cluster/ClusterNodeCommandHandler.java b/src/main/java/com/lambdaworks/redis/cluster/ClusterNodeCommandHandler.java new file mode 100644 index 0000000000..69c1ea1189 --- /dev/null +++ b/src/main/java/com/lambdaworks/redis/cluster/ClusterNodeCommandHandler.java @@ -0,0 +1,107 @@ +package com.lambdaworks.redis.cluster; + +import java.util.Queue; +import java.util.Set; + +import com.google.common.collect.ImmutableSet; +import com.lambdaworks.redis.ClientOptions; +import com.lambdaworks.redis.RedisChannelWriter; +import com.lambdaworks.redis.RedisException; +import com.lambdaworks.redis.protocol.CommandHandler; +import com.lambdaworks.redis.protocol.ConnectionWatchdog; +import com.lambdaworks.redis.protocol.RedisCommand; + +import io.netty.channel.ChannelHandler; +import io.netty.util.internal.logging.InternalLogger; +import io.netty.util.internal.logging.InternalLoggerFactory; + +/** + * @author Mark Paluch + */ +@ChannelHandler.Sharable +class ClusterNodeCommandHandler extends CommandHandler { + + private static final InternalLogger logger = InternalLoggerFactory.getInstance(ClusterNodeCommandHandler.class); + private static final Set CHANNEL_OPEN_STATES = ImmutableSet.of(LifecycleState.ACTIVATING, + LifecycleState.ACTIVE, LifecycleState.CONNECTED); + + private final RedisChannelWriter clusterChannelWriter; + + /** + * Initialize a new instance that handles commands from the supplied queue. + * + * @param clientOptions client options for this connection + * @param queue The command queue + * @param clusterChannelWriter top-most channel writer. + */ + public ClusterNodeCommandHandler(ClientOptions clientOptions, Queue> queue, + RedisChannelWriter clusterChannelWriter) { + super(clientOptions, queue); + this.clusterChannelWriter = clusterChannelWriter; + } + + /** + * Prepare the closing of the channel. + */ + public void prepareClose() { + if (channel != null) { + ConnectionWatchdog connectionWatchdog = channel.pipeline().get(ConnectionWatchdog.class); + if (connectionWatchdog != null) { + connectionWatchdog.setReconnectSuspended(true); + } + } + } + + /** + * Move queued and buffered commands from the inactive connection to the master command writer. This is done only if the + * current connection is disconnected and auto-reconnect is enabled (command-retries). If the connection would be open, we + * could get into a race that the commands we're moving are right now in processing. Alive connections can handle redirects + * and retries on their own. + */ + @Override + public void close() { + + logger.debug("{} close()", logPrefix()); + + if (clusterChannelWriter != null) { + if (isAutoReconnect() && !CHANNEL_OPEN_STATES.contains(getState())) { + for (RedisCommand queuedCommand : queue) { + try { + clusterChannelWriter.write(queuedCommand); + } catch (RedisException e) { + queuedCommand.setException(e); + queuedCommand.complete(); + } + } + + queue.clear(); + } + + for (RedisCommand queuedCommand : commandBuffer) { + try { + clusterChannelWriter.write(queuedCommand); + } catch (RedisException e) { + queuedCommand.setException(e); + queuedCommand.complete(); + } + } + + commandBuffer.clear(); + } + + super.close(); + } + + public boolean isAutoReconnect() { + return clientOptions.isAutoReconnect(); + } + + public boolean isQueueEmpty() { + if (queue.isEmpty() && commandBuffer.isEmpty()) { + return true; + } + + return false; + } + +} diff --git a/src/main/java/com/lambdaworks/redis/cluster/PooledClusterConnectionProvider.java b/src/main/java/com/lambdaworks/redis/cluster/PooledClusterConnectionProvider.java index 8bceefd09a..c88c6ad4ca 100644 --- a/src/main/java/com/lambdaworks/redis/cluster/PooledClusterConnectionProvider.java +++ b/src/main/java/com/lambdaworks/redis/cluster/PooledClusterConnectionProvider.java @@ -6,6 +6,7 @@ import java.util.Map; import java.util.Set; +import com.google.common.base.Supplier; import com.google.common.cache.CacheBuilder; import com.google.common.cache.CacheLoader; import com.google.common.cache.LoadingCache; @@ -13,9 +14,9 @@ import com.google.common.collect.Maps; import com.google.common.collect.Sets; import com.google.common.net.HostAndPort; -import com.lambdaworks.redis.LettuceStrings; import com.lambdaworks.redis.RedisAsyncConnection; import com.lambdaworks.redis.RedisAsyncConnectionImpl; +import com.lambdaworks.redis.RedisChannelWriter; import com.lambdaworks.redis.RedisException; import com.lambdaworks.redis.RedisURI; import com.lambdaworks.redis.cluster.models.partitions.Partitions; @@ -36,7 +37,8 @@ class PooledClusterConnectionProvider implements ClusterConnectionProvider { private static final InternalLogger logger = InternalLoggerFactory.getInstance(PooledClusterConnectionProvider.class); - private final LoadingCache> connections; + // Contains NodeId-identified and HostAndPort-identified connections. + private final LoadingCache> connections; private final boolean debugEnabled; private final RedisAsyncConnectionImpl writers[] = new RedisAsyncConnectionImpl[SlotHash.SLOT_COUNT]; private Partitions partitions; @@ -44,28 +46,11 @@ class PooledClusterConnectionProvider implements ClusterConnectionProvider private boolean autoFlushCommands = true; private Object stateLock = new Object(); - public PooledClusterConnectionProvider(final RedisClusterClient redisClusterClient, final RedisCodec redisCodec) { + public PooledClusterConnectionProvider(RedisClusterClient redisClusterClient, RedisChannelWriter clusterWriter, + RedisCodec redisCodec) { this.debugEnabled = logger.isDebugEnabled(); - this.connections = CacheBuilder.newBuilder().build(new CacheLoader>() { - @Override - public RedisAsyncConnectionImpl load(PoolKey key) throws Exception { - - Set redisUris = getConnectionPointsOfAllNodes(); - HostAndPort hostAndPort = HostAndPort.fromParts(key.host, key.port); - - if (!redisUris.contains(hostAndPort)) { - throw new IllegalArgumentException("Connection to " + hostAndPort - + " not allowed. This connection point is not known in the cluster view"); - } - - RedisAsyncConnectionImpl connection = redisClusterClient.connectAsyncImpl(redisCodec, - key.getSocketAddress()); - synchronized (stateLock) { - connection.getChannelWriter().setAutoFlushCommands(autoFlushCommands); - } - return connection; - } - }); + this.connections = CacheBuilder.newBuilder().build( + new ConnectionFactory(redisClusterClient, redisCodec, clusterWriter)); } @Override @@ -75,7 +60,13 @@ public RedisAsyncConnectionImpl getConnection(Intent intent, int slot) { logger.debug("getConnection(" + intent + ", " + slot + ")"); } - RedisAsyncConnectionImpl writer = writers[slot]; + RedisAsyncConnectionImpl writer; + + // avoid races when reconfiguring partitions. + synchronized (stateLock) { + writer = writers[slot]; + } + if (writer == null) { RedisClusterNode partition = partitions.getPartitionBySlot(slot); if (partition == null) { @@ -83,8 +74,8 @@ public RedisAsyncConnectionImpl getConnection(Intent intent, int slot) { } try { - PoolKey key = new PoolKey(intent, partition.getUri()); - return writers[slot] = getConnection(key); + ConnectionKey key = new ConnectionKey(intent, partition.getNodeId()); + return writers[slot] = connections.get(key); } catch (Exception e) { throw new RedisException(e); } @@ -92,9 +83,7 @@ public RedisAsyncConnectionImpl getConnection(Intent intent, int slot) { return writer; } - private RedisAsyncConnectionImpl getConnection(PoolKey key) throws java.util.concurrent.ExecutionException { - return connections.get(key); - } + @Override @SuppressWarnings({ "unchecked", "hiding", "rawtypes" }) @@ -103,18 +92,36 @@ public RedisAsyncConnectionImpl getConnection(Intent intent, String host, if (debugEnabled) { logger.debug("getConnection(" + intent + ", " + host + ", " + port + ")"); } - PoolKey key = new PoolKey(intent, host, port); - return getConnection(key); + + RedisClusterNode redisClusterNode = getPartition(host, port); + + if (redisClusterNode == null) { + HostAndPort hostAndPort = HostAndPort.fromParts(host, port); + throw invalidConnectionPoint(hostAndPort.toString()); + } + + ConnectionKey key = new ConnectionKey(intent, host, port); + return connections.get(key); } catch (Exception e) { throw new RedisException(e); } } + private RedisClusterNode getPartition(String host, int port) { + for (RedisClusterNode partition : partitions) { + RedisURI uri = partition.getUri(); + if (port == uri.getPort() && host.equals(uri.getHost())) { + return partition; + } + } + return null; + } + @Override public void close() { - ImmutableMap> copy = ImmutableMap.copyOf(this.connections.asMap()); + ImmutableMap> copy = ImmutableMap.copyOf(this.connections.asMap()); this.connections.invalidateAll(); - resetPartitions(); + resetWriterCache(); for (RedisAsyncConnection kvRedisAsyncConnection : copy.values()) { if (kvRedisAsyncConnection.isOpen()) { kvRedisAsyncConnection.close(); @@ -124,116 +131,95 @@ public void close() { @Override public void reset() { - ImmutableMap> copy = ImmutableMap.copyOf(this.connections.asMap()); + ImmutableMap> copy = ImmutableMap.copyOf(this.connections.asMap()); for (RedisAsyncConnectionImpl kvRedisAsyncConnection : copy.values()) { kvRedisAsyncConnection.reset(); } } - private static class PoolKey { - private final ClusterConnectionProvider.Intent intent; - private SocketAddress socketAddress; - private final String host; - private final int port; - - private PoolKey(ClusterConnectionProvider.Intent intent, RedisURI uri) { - this.intent = intent; - this.host = uri.getHost(); - this.port = uri.getPort(); - this.socketAddress = uri.getResolvedAddress(); - } - - private PoolKey(Intent intent, String host, int port) { - this.intent = intent; - this.host = host; - this.port = port; - } - - public ClusterConnectionProvider.Intent getIntent() { - return intent; + /** + * Synchronize on {@code stateLock} to initiate a happens-before relation and clear the thread caches of other threads. + * + * @param partitions the new partitions. + */ + @Override + public void setPartitions(Partitions partitions) { + synchronized (stateLock) { + this.partitions = partitions; + reconfigurePartitions(); } + } - public SocketAddress getSocketAddress() { + private void reconfigurePartitions() { + Set staleConnections = getStaleConnectionKeys(); - if (socketAddress == null && LettuceStrings.isNotEmpty(host)) { - socketAddress = new InetSocketAddress(host, port); + for (ConnectionKey key : staleConnections) { + RedisAsyncConnectionImpl connection = connections.getIfPresent(key); + if (connection.getChannelWriter() instanceof ClusterNodeCommandHandler) { + ClusterNodeCommandHandler clusterNodeCommandHandler = (ClusterNodeCommandHandler) connection + .getChannelWriter(); + clusterNodeCommandHandler.prepareClose(); } - - return socketAddress; } - @Override - public boolean equals(Object o) { - if (this == o) { - return true; - } - if (!(o instanceof PoolKey)) { - return false; - } - - PoolKey poolKey = (PoolKey) o; - - if (port != poolKey.port) { - return false; - } - if (host != null ? !host.equals(poolKey.host) : poolKey.host != null) { - return false; - } - if (intent != poolKey.intent) { - return false; - } - - return true; - } + resetWriterCache(); - @Override - public int hashCode() { - int result = intent != null ? intent.hashCode() : 0; - result = 31 * result + (host != null ? host.hashCode() : 0); - result = 31 * result + port; - return result; + for (ConnectionKey key : staleConnections) { + RedisAsyncConnectionImpl connection = connections.getIfPresent(key); + connection.close(); + connections.invalidate(key); } } - @Override - public void setPartitions(Partitions partitions) { - this.partitions = partitions; - resetPartitions(); - } - + /** + * Close stale connections. + */ @Override public void closeStaleConnections() { logger.debug("closeStaleConnections() count before expiring: {}", getConnectionCount()); - Map> map = Maps.newHashMap(connections.asMap()); - Set redisUris = getConnectionPointsOfAllNodes(); - for (PoolKey poolKey : map.keySet()) { - if (redisUris.contains(HostAndPort.fromParts(poolKey.host, poolKey.port))) { - continue; - } + Set stale = getStaleConnectionKeys(); - connections.invalidate(poolKey); - map.get(poolKey).close(); + for (ConnectionKey connectionKey : stale) { + RedisAsyncConnectionImpl connection = connections.getIfPresent(connectionKey); + if (connection != null) { + connections.invalidate(connectionKey); + connection.close(); + } } logger.debug("closeStaleConnections() count after expiring: {}", getConnectionCount()); } - protected Set getConnectionPointsOfAllNodes() { - Set redisUris = Sets.newHashSet(); + /** + * Retrieve a set of PoolKey's for all pooled connections that are within the pool but not within the {@link Partitions}. + * + * @return Set of {@link ConnectionKey}s + */ + private Set getStaleConnectionKeys() { + Map> map = Maps.newHashMap(connections.asMap()); + Set stale = Sets.newHashSet(); - for (RedisClusterNode partition : partitions) { - if (partition.getFlags().contains(RedisClusterNode.NodeFlag.MASTER)) { - redisUris.add(HostAndPort.fromParts(partition.getUri().getHost(), partition.getUri().getPort())); + for (ConnectionKey connectionKey : map.keySet()) { + + if (connectionKey.nodeId != null && partitions.getPartitionByNodeId(connectionKey.nodeId) != null) { + continue; } - if (partition.getFlags().contains(RedisClusterNode.NodeFlag.SLAVE)) { - redisUris.add(HostAndPort.fromParts(partition.getUri().getHost(), partition.getUri().getPort())); + if (connectionKey.host != null && getPartition(connectionKey.host, connectionKey.port) != null) { + continue; } + stale.add(connectionKey); } - return redisUris; + return stale; } + /** + * Set auto-flush on all commands. Synchronize on {@code stateLock} to initiate a happens-before relation and clear the + * thread caches of other threads. + * + * @param autoFlush state of autoFlush. + */ @Override public void setAutoFlushCommands(boolean autoFlush) { synchronized (stateLock) { @@ -246,20 +232,150 @@ public void setAutoFlushCommands(boolean autoFlush) { @Override public void flushCommands() { - for (RedisAsyncConnectionImpl connection : connections.asMap().values()) { connection.getChannelWriter().flushCommands(); } } + /** + * + * @return number of connections. + */ protected long getConnectionCount() { return connections.size(); } - protected void resetPartitions() { - + /** + * Reset the internal writer cache. This is necessary because the {@link Partitions} have no reference to the writer cache. + * + * Synchronize on {@code stateLock} to initiate a happens-before relation and clear the thread caches of other threads. + */ + protected void resetWriterCache() { synchronized (stateLock) { Arrays.fill(writers, null); } } + + private RuntimeException invalidConnectionPoint(String message) { + return new IllegalArgumentException("Connection to " + message + + " not allowed. This connection point is not known in the cluster view"); + } + + private Supplier getSocketAddressSupplier(final ConnectionKey connectionKey) { + return new Supplier() { + @Override + public SocketAddress get() { + + if (connectionKey.nodeId != null) { + return getSocketAddress(connectionKey.nodeId); + } + return new InetSocketAddress(connectionKey.host, connectionKey.port); + } + + }; + } + + protected SocketAddress getSocketAddress(String nodeId) { + for (RedisClusterNode partition : partitions) { + if (partition.getNodeId().equals(nodeId)) { + return partition.getUri().getResolvedAddress(); + } + } + return null; + } + + /** + * Connection to identify a connection either by nodeId or host/port. + */ + private static class ConnectionKey { + private final ClusterConnectionProvider.Intent intent; + private final String nodeId; + private final String host; + private final int port; + + public ConnectionKey(Intent intent, String nodeId) { + this.intent = intent; + this.nodeId = nodeId; + this.host = null; + this.port = 0; + } + + public ConnectionKey(Intent intent, String host, int port) { + this.intent = intent; + this.host = host; + this.port = port; + this.nodeId = null; + } + + @Override + public boolean equals(Object o) { + if (this == o) + return true; + if (!(o instanceof ConnectionKey)) + return false; + + ConnectionKey key = (ConnectionKey) o; + + if (port != key.port) + return false; + if (intent != key.intent) + return false; + if (nodeId != null ? !nodeId.equals(key.nodeId) : key.nodeId != null) + return false; + return !(host != null ? !host.equals(key.host) : key.host != null); + } + + @Override + public int hashCode() { + int result = intent != null ? intent.hashCode() : 0; + result = 31 * result + (nodeId != null ? nodeId.hashCode() : 0); + result = 31 * result + (host != null ? host.hashCode() : 0); + result = 31 * result + port; + return result; + } + } + + private class ConnectionFactory extends CacheLoader> { + + private final RedisClusterClient redisClusterClient; + private final RedisCodec redisCodec; + private final RedisChannelWriter clusterWriter; + + public ConnectionFactory(RedisClusterClient redisClusterClient, RedisCodec redisCodec, + RedisChannelWriter clusterWriter) { + this.redisClusterClient = redisClusterClient; + this.redisCodec = redisCodec; + this.clusterWriter = clusterWriter; + } + + @Override + public RedisAsyncConnectionImpl load(ConnectionKey key) throws Exception { + + RedisAsyncConnectionImpl connection = null; + if (key.nodeId != null) { + if (partitions.getPartitionByNodeId(key.nodeId) == null) { + throw invalidConnectionPoint("node id " + key.nodeId); + } + + // NodeId connections provide command recovery due to cluster reconfiguration + connection = redisClusterClient.connectNode(redisCodec, key.nodeId, clusterWriter, + getSocketAddressSupplier(key)); + } + + if (key.host != null) { + if (getPartition(key.host, key.port) == null) { + throw invalidConnectionPoint(key.host + ":" + key.port); + } + + // Host and port connections do not provide command recovery due to cluster reconfiguration + connection = redisClusterClient.connectNode(redisCodec, key.host + ":" + key.port, null, + getSocketAddressSupplier(key)); + } + + synchronized (stateLock) { + connection.getChannelWriter().setAutoFlushCommands(autoFlushCommands); + } + return connection; + } + } } diff --git a/src/main/java/com/lambdaworks/redis/cluster/RedisAdvancedClusterAsyncConnection.java b/src/main/java/com/lambdaworks/redis/cluster/RedisAdvancedClusterAsyncConnection.java index bb8e83e514..a4e2642c7f 100644 --- a/src/main/java/com/lambdaworks/redis/cluster/RedisAdvancedClusterAsyncConnection.java +++ b/src/main/java/com/lambdaworks/redis/cluster/RedisAdvancedClusterAsyncConnection.java @@ -12,7 +12,10 @@ public interface RedisAdvancedClusterAsyncConnection extends RedisClusterAsyncConnection { /** - * Retrieve a connection to the specified cluster node using the nodeId. Host and port are looked up in the node list. + * Retrieve a connection to the specified cluster node using the nodeId. Host and port are looked up in the node list. This + * connection is bound to the node id. Once the cluster topology view is updated, the connection will try to reconnect the + * to the node with the specified {@code nodeId}, that behavior can also lead to a closed connection once the node with the + * specified {@code nodeId} is no longer part of the cluster. * * Do not close the connections. Otherwise, unpredictable behavior will occur. The nodeId must be part of the cluster and is * validated against the current topology view in {@link com.lambdaworks.redis.cluster.models.partitions.Partitions}. @@ -27,9 +30,13 @@ public interface RedisAdvancedClusterAsyncConnection extends RedisClusterA RedisClusterAsyncConnection getConnection(String nodeId); /** - * Retrieve a connection to the specified cluster node using the nodeId. Do not close the connections. Otherwise, - * unpredictable behavior will occur. The node must be part of the cluster and host/port are validated (exact check) against - * the current topology view in {@link com.lambdaworks.redis.cluster.models.partitions.Partitions}. + * Retrieve a connection to the specified cluster node using the nodeId. This connection is bound to a host and port. + * Updates to the cluster topology view can close the connection once the host, identified by {@code host} and {@code port}, + * are no longer part of the cluster. + * + * Do not close the connections. Otherwise, unpredictable behavior will occur. The node must be part of the cluster and + * host/port are validated (exact check) against the current topology view in + * {@link com.lambdaworks.redis.cluster.models.partitions.Partitions}. * * In contrast to the {@link RedisAdvancedClusterAsyncConnection}, node-connections do not route commands to other cluster * nodes. diff --git a/src/main/java/com/lambdaworks/redis/cluster/RedisAdvancedClusterConnection.java b/src/main/java/com/lambdaworks/redis/cluster/RedisAdvancedClusterConnection.java index 821b63aa1a..3791467fd2 100644 --- a/src/main/java/com/lambdaworks/redis/cluster/RedisAdvancedClusterConnection.java +++ b/src/main/java/com/lambdaworks/redis/cluster/RedisAdvancedClusterConnection.java @@ -12,13 +12,15 @@ public interface RedisAdvancedClusterConnection extends RedisClusterConnection { /** - * Retrieve a connection to the specified cluster node using the nodeId. Host and port are looked up in the node list. + * Retrieve a connection to the specified cluster node using the nodeId. Host and port are looked up in the node list. This + * connection is bound to the node id. Once the cluster topology view is updated, the connection will try to reconnect the + * to the node with the specified {@code nodeId}, that behavior can also lead to a closed connection once the node with the + * specified {@code nodeId} is no longer part of the cluster. * * Do not close the connections. Otherwise, unpredictable behavior will occur. The nodeId must be part of the cluster and is * validated against the current topology view in {@link com.lambdaworks.redis.cluster.models.partitions.Partitions}. * - * In contrast to the {@link RedisAdvancedClusterConnection}, node-connections do not route commands to other cluster - * nodes. + * In contrast to the {@link RedisAdvancedClusterConnection}, node-connections do not route commands to other cluster nodes. * * @param nodeId the node Id * @return a connection to the requested cluster node @@ -27,12 +29,15 @@ public interface RedisAdvancedClusterConnection extends RedisClusterConnec RedisClusterConnection getConnection(String nodeId); /** - * Retrieve a connection to the specified cluster node using the nodeId. Do not close the connections. Otherwise, - * unpredictable behavior will occur. The node must be part of the cluster and host/port are validated (exact check) against - * the current topology view in {@link com.lambdaworks.redis.cluster.models.partitions.Partitions}. + * Retrieve a connection to the specified cluster node using the nodeId. This connection is bound to a host and port. + * Updates to the cluster topology view can close the connection once the host, identified by {@code host} and {@code port}, + * are no longer part of the cluster. + * + * Do not close the connections. Otherwise, unpredictable behavior will occur. The node must be part of the cluster and + * host/port are validated (exact check) against the current topology view in + * {@link com.lambdaworks.redis.cluster.models.partitions.Partitions}. * - * In contrast to the {@link RedisAdvancedClusterConnection}, node-connections do not route commands to other cluster - * nodes. + * In contrast to the {@link RedisAdvancedClusterConnection}, node-connections do not route commands to other cluster nodes. * * @param host the host * @param port the port diff --git a/src/main/java/com/lambdaworks/redis/cluster/RedisClusterClient.java b/src/main/java/com/lambdaworks/redis/cluster/RedisClusterClient.java index 6255e722e2..16406247ff 100644 --- a/src/main/java/com/lambdaworks/redis/cluster/RedisClusterClient.java +++ b/src/main/java/com/lambdaworks/redis/cluster/RedisClusterClient.java @@ -44,7 +44,6 @@ public class RedisClusterClient extends AbstractRedisClient { private static final InternalLogger logger = InternalLoggerFactory.getInstance(RedisClusterClient.class); - protected AtomicBoolean clusterTopologyRefreshActivated = new AtomicBoolean(false); private ClusterTopologyRefresh refresh = new ClusterTopologyRefresh(this); @@ -126,32 +125,37 @@ public RedisAdvancedClusterAsyncConnection connectClusterAsync(Redi return connectClusterAsyncImpl(codec, getSocketAddressSupplier()); } - protected RedisAsyncConnectionImpl connectAsyncImpl(SocketAddress socketAddress) { - return connectAsyncImpl(newStringStringCodec(), socketAddress); + protected RedisAsyncConnectionImpl connectAsyncImpl(final SocketAddress socketAddress) { + return connectNode(newStringStringCodec(), socketAddress.toString(), null, new Supplier() { + @Override + public SocketAddress get() { + return socketAddress; + } + }); } /** * Create a connection to a redis socket address. * - * @param socketAddress initial connect + * @param codec Use this codec to encode/decode keys and values. + * @param nodeId the nodeId + * @param clusterWriter global cluster writer + * @param socketAddressSupplier supplier for the socket address + * * @param Key type. * @param Value type. * @return a new connection */ - RedisAsyncConnectionImpl connectAsyncImpl(RedisCodec codec, final SocketAddress socketAddress) { + RedisAsyncConnectionImpl connectNode(RedisCodec codec, String nodeId, + RedisChannelWriter clusterWriter, final Supplier socketAddressSupplier) { - logger.debug("connectAsyncImpl(" + socketAddress + ")"); + logger.debug("connectNode(" + nodeId + ")"); Queue> queue = new ArrayDeque>(); - CommandHandler handler = new CommandHandler(clientOptions, queue); + ClusterNodeCommandHandler handler = new ClusterNodeCommandHandler(clientOptions, queue, clusterWriter); RedisAsyncConnectionImpl connection = newRedisAsyncConnectionImpl(handler, codec, timeout, unit); - connectAsyncImpl(handler, connection, new Supplier() { - @Override - public SocketAddress get() { - return socketAddress; - } - }); + connectAsyncImpl(handler, connection, socketAddressSupplier); connection.registerCloseables(closeableResources, connection); @@ -183,11 +187,12 @@ RedisAdvancedClusterAsyncConnectionImpl connectClusterAsyncImpl(Red CommandHandler handler = new CommandHandler(clientOptions, queue); - final PooledClusterConnectionProvider pooledClusterConnectionProvider = new PooledClusterConnectionProvider( - this, codec); + ClusterDistributionChannelWriter clusterWriter = new ClusterDistributionChannelWriter(handler); + PooledClusterConnectionProvider pooledClusterConnectionProvider = new PooledClusterConnectionProvider(this, + clusterWriter, codec); + + clusterWriter.setClusterConnectionProvider(pooledClusterConnectionProvider); - final ClusterDistributionChannelWriter clusterWriter = new ClusterDistributionChannelWriter(handler, - pooledClusterConnectionProvider); RedisAdvancedClusterAsyncConnectionImpl connection = newRedisAdvancedClusterAsyncConnectionImpl(clusterWriter, codec, timeout, unit); @@ -363,6 +368,11 @@ protected Utf8StringCodec newStringStringCodec() { return new Utf8StringCodec(); } + /** + * Sets the new cluster topology. The partitions are not applied to existing connections. + * + * @param partitions partitions object + */ public void setPartitions(Partitions partitions) { this.partitions = partitions; } diff --git a/src/main/java/com/lambdaworks/redis/protocol/CommandHandler.java b/src/main/java/com/lambdaworks/redis/protocol/CommandHandler.java index 50bfeb3b8e..ec7b6d8012 100644 --- a/src/main/java/com/lambdaworks/redis/protocol/CommandHandler.java +++ b/src/main/java/com/lambdaworks/redis/protocol/CommandHandler.java @@ -48,10 +48,10 @@ public class CommandHandler extends ChannelDuplexHandler implements RedisC protected Queue> commandBuffer = new ArrayDeque>(); protected ByteBuf buffer; protected RedisStateMachine rsm; + protected Channel channel; private LifecycleState lifecycleState = LifecycleState.NOT_CONNECTED; private Object stateLock = new Object(); - private Channel channel; /** * If TRACE level logging has been enabled at startup. @@ -392,6 +392,10 @@ protected void setState(LifecycleState lifecycleState) { } } + protected LifecycleState getState() { + return lifecycleState; + } + private void cancelCommands(String message) { int size = 0; if (queue != null) { @@ -512,10 +516,10 @@ public void setRedisChannelHandler(RedisChannelHandler redisChannelHandler public void setAutoFlushCommands(boolean autoFlush) { synchronized (stateLock) { this.autoFlushCommands = autoFlush; - } + } } - private String logPrefix() { + protected String logPrefix() { if (logPrefix != null) { return logPrefix; } @@ -524,9 +528,7 @@ private String logPrefix() { return logPrefix = buffer.toString(); } - @VisibleForTesting - enum LifecycleState { - + public enum LifecycleState { NOT_CONNECTED, REGISTERED, CONNECTED, ACTIVATING, ACTIVE, DISCONNECTED, DEACTIVATING, DEACTIVATED, CLOSED, }