diff --git a/src/main/java/com/lambdaworks/redis/cluster/ClusterDistributionChannelWriter.java b/src/main/java/com/lambdaworks/redis/cluster/ClusterDistributionChannelWriter.java index 2b7f8d4cb6..acde8560a2 100644 --- a/src/main/java/com/lambdaworks/redis/cluster/ClusterDistributionChannelWriter.java +++ b/src/main/java/com/lambdaworks/redis/cluster/ClusterDistributionChannelWriter.java @@ -12,6 +12,8 @@ import com.lambdaworks.redis.RedisException; import com.lambdaworks.redis.api.StatefulRedisConnection; import com.lambdaworks.redis.cluster.models.partitions.Partitions; +import com.lambdaworks.redis.cluster.models.partitions.Partitions; +import com.lambdaworks.redis.protocol.Command; import com.lambdaworks.redis.protocol.CommandArgs; import com.lambdaworks.redis.protocol.CommandKeyword; import com.lambdaworks.redis.protocol.RedisCommand; @@ -31,10 +33,8 @@ class ClusterDistributionChannelWriter implements RedisChannelWriter private boolean closed = false; private int executionLimit = 5; - public ClusterDistributionChannelWriter(RedisChannelWriter defaultWriter, - ClusterConnectionProvider clusterConnectionProvider) { + public ClusterDistributionChannelWriter(RedisChannelWriter defaultWriter) { this.defaultWriter = defaultWriter; - this.clusterConnectionProvider = clusterConnectionProvider; } @Override @@ -174,6 +174,10 @@ public void reset() { clusterConnectionProvider.reset(); } + public void setClusterConnectionProvider(ClusterConnectionProvider clusterConnectionProvider) { + this.clusterConnectionProvider = clusterConnectionProvider; + } + public void setPartitions(Partitions partitions) { clusterConnectionProvider.setPartitions(partitions); } diff --git a/src/main/java/com/lambdaworks/redis/cluster/ClusterNodeCommandHandler.java b/src/main/java/com/lambdaworks/redis/cluster/ClusterNodeCommandHandler.java new file mode 100644 index 0000000000..b4893f7857 --- /dev/null +++ b/src/main/java/com/lambdaworks/redis/cluster/ClusterNodeCommandHandler.java @@ -0,0 +1,106 @@ +package com.lambdaworks.redis.cluster; + +import java.util.Queue; +import java.util.Set; + +import com.google.common.collect.ImmutableSet; +import com.lambdaworks.redis.ClientOptions; +import com.lambdaworks.redis.RedisChannelWriter; +import com.lambdaworks.redis.RedisException; +import com.lambdaworks.redis.protocol.CommandHandler; +import com.lambdaworks.redis.protocol.ConnectionWatchdog; +import com.lambdaworks.redis.protocol.RedisCommand; + +import io.netty.channel.ChannelHandler; +import io.netty.util.internal.logging.InternalLogger; +import io.netty.util.internal.logging.InternalLoggerFactory; + +/** + * @author Mark Paluch + */ +@ChannelHandler.Sharable +class ClusterNodeCommandHandler extends CommandHandler { + + private static final InternalLogger logger = InternalLoggerFactory.getInstance(ClusterNodeCommandHandler.class); + private static final Set CHANNEL_OPEN_STATES = ImmutableSet.of(LifecycleState.ACTIVATING, + LifecycleState.ACTIVE, LifecycleState.CONNECTED); + + private final RedisChannelWriter clusterChannelWriter; + + /** + * Initialize a new instance that handles commands from the supplied queue. + * + * @param clientOptions client options for this connection + * @param queue The command queue + * @param clusterChannelWriter top-most channel writer. + */ + public ClusterNodeCommandHandler(ClientOptions clientOptions, Queue> queue, + RedisChannelWriter clusterChannelWriter) { + super(clientOptions, queue); + this.clusterChannelWriter = clusterChannelWriter; + } + + /** + * Prepare the closing of the channel. + */ + public void prepareClose() { + if (channel != null) { + ConnectionWatchdog connectionWatchdog = channel.pipeline().get(ConnectionWatchdog.class); + if (connectionWatchdog != null) { + connectionWatchdog.setReconnectSuspended(true); + } + } + } + + /** + * Move queued and buffered commands from the inactive connection to the master command writer. This is done only if the + * current connection is disconnected and auto-reconnect is enabled (command-retries). If the connection would be open, we + * could get into a race that the commands we're moving are right now in processing. Alive connections can handle redirects + * and retries on their own. + */ + @Override + public void close() { + + logger.debug("{} close()", logPrefix()); + + if (clusterChannelWriter != null) { + if (isAutoReconnect() && !CHANNEL_OPEN_STATES.contains(getState())) { + for (RedisCommand queuedCommand : queue) { + try { + clusterChannelWriter.write(queuedCommand); + } catch (RedisException e) { + queuedCommand.completeExceptionally(e); + queuedCommand.complete(); + } + } + + queue.clear(); + } + + for (RedisCommand queuedCommand : commandBuffer) { + try { + clusterChannelWriter.write(queuedCommand); + } catch (RedisException e) { + queuedCommand.completeExceptionally(e); + } + } + + commandBuffer.clear(); + } + + super.close(); + } + + public boolean isAutoReconnect() { + return clientOptions.isAutoReconnect(); + } + + public boolean isQueueEmpty() { + if (queue.isEmpty() && commandBuffer.isEmpty()) { + return true; + } + + return false; + } + +} diff --git a/src/main/java/com/lambdaworks/redis/cluster/PooledClusterConnectionProvider.java b/src/main/java/com/lambdaworks/redis/cluster/PooledClusterConnectionProvider.java index 493514b2fd..f200e30a22 100644 --- a/src/main/java/com/lambdaworks/redis/cluster/PooledClusterConnectionProvider.java +++ b/src/main/java/com/lambdaworks/redis/cluster/PooledClusterConnectionProvider.java @@ -6,6 +6,7 @@ import java.util.Map; import java.util.Set; +import com.google.common.base.Supplier; import com.google.common.cache.CacheBuilder; import com.google.common.cache.CacheLoader; import com.google.common.cache.LoadingCache; @@ -14,7 +15,8 @@ import com.google.common.collect.Sets; import com.google.common.net.HostAndPort; import com.google.common.util.concurrent.UncheckedExecutionException; -import com.lambdaworks.redis.LettuceStrings; +import com.lambdaworks.redis.RedisChannelHandler; +import com.lambdaworks.redis.RedisChannelWriter; import com.lambdaworks.redis.RedisException; import com.lambdaworks.redis.RedisURI; import com.lambdaworks.redis.api.StatefulRedisConnection; @@ -36,7 +38,8 @@ class PooledClusterConnectionProvider implements ClusterConnectionProvider { private static final InternalLogger logger = InternalLoggerFactory.getInstance(PooledClusterConnectionProvider.class); - private final LoadingCache> connections; + // Contains NodeId-identified and HostAndPort-identified connections. + private final LoadingCache> connections; private final boolean debugEnabled; private final StatefulRedisConnection writers[] = new StatefulRedisConnection[SlotHash.SLOT_COUNT]; private Partitions partitions; @@ -44,31 +47,11 @@ class PooledClusterConnectionProvider implements ClusterConnectionProvider private boolean autoFlushCommands = true; private Object stateLock = new Object(); - public PooledClusterConnectionProvider(final RedisClusterClient redisClusterClient, final RedisCodec redisCodec) { + public PooledClusterConnectionProvider(RedisClusterClient redisClusterClient, RedisChannelWriter clusterWriter, + RedisCodec redisCodec) { this.debugEnabled = logger.isDebugEnabled(); - this.connections = CacheBuilder.newBuilder().build(new CacheLoader>() { - @Override - public StatefulRedisConnection load(PoolKey key) throws Exception { - - Set redisUris = getConnectionPointsOfAllNodes(); - HostAndPort hostAndPort = HostAndPort.fromParts(key.host, key.port); - - if (!redisUris.contains(hostAndPort)) { - throw new IllegalArgumentException("Connection to " + hostAndPort - + " not allowed. This connection point is not known in the cluster view"); - } - - StatefulRedisConnection connection = redisClusterClient.connectToNode(redisCodec, key.getSocketAddress()); - if (key.getIntent() == Intent.READ) { - connection.sync().readOnly(); - } - - synchronized (stateLock) { - connection.setAutoFlushCommands(autoFlushCommands); - } - return connection; - } - }); + this.connections = CacheBuilder.newBuilder().build( + new ConnectionFactory(redisClusterClient, redisCodec, clusterWriter)); } @Override @@ -78,7 +61,13 @@ public StatefulRedisConnection getConnection(Intent intent, int slot) { logger.debug("getConnection(" + intent + ", " + slot + ")"); } - StatefulRedisConnection writer = writers[slot]; + StatefulRedisConnection writer; + + // avoid races when reconfiguring partitions. + synchronized (stateLock) { + writer = writers[slot]; + } + if (writer == null) { RedisClusterNode partition = partitions.getPartitionBySlot(slot); if (partition == null) { @@ -86,8 +75,8 @@ public StatefulRedisConnection getConnection(Intent intent, int slot) { } try { - PoolKey key = new PoolKey(intent, partition.getUri()); - return writers[slot] = getConnection(key); + ConnectionKey key = new ConnectionKey(intent, partition.getNodeId()); + return writers[slot] = connections.get(key); } catch (UncheckedExecutionException e) { throw new RedisException(e.getCause()); } catch (Exception e) { @@ -97,10 +86,6 @@ public StatefulRedisConnection getConnection(Intent intent, int slot) { return writer; } - private StatefulRedisConnection getConnection(PoolKey key) throws java.util.concurrent.ExecutionException { - return connections.get(key); - } - @Override @SuppressWarnings({ "unchecked", "hiding", "rawtypes" }) public StatefulRedisConnection getConnection(Intent intent, String host, int port) { @@ -108,8 +93,16 @@ public StatefulRedisConnection getConnection(Intent intent, String host, i if (debugEnabled) { logger.debug("getConnection(" + intent + ", " + host + ", " + port + ")"); } - PoolKey key = new PoolKey(intent, host, port); - return getConnection(key); + + RedisClusterNode redisClusterNode = getPartition(host, port); + + if (redisClusterNode == null) { + HostAndPort hostAndPort = HostAndPort.fromParts(host, port); + throw invalidConnectionPoint(hostAndPort.toString()); + } + + ConnectionKey key = new ConnectionKey(intent, host, port); + return connections.get(key); } catch (UncheckedExecutionException e) { throw new RedisException(e.getCause()); } catch (Exception e) { @@ -117,11 +110,21 @@ public StatefulRedisConnection getConnection(Intent intent, String host, i } } + private RedisClusterNode getPartition(String host, int port) { + for (RedisClusterNode partition : partitions) { + RedisURI uri = partition.getUri(); + if (port == uri.getPort() && host.equals(uri.getHost())) { + return partition; + } + } + return null; + } + @Override public void close() { - ImmutableMap> copy = ImmutableMap.copyOf(this.connections.asMap()); + ImmutableMap> copy = ImmutableMap.copyOf(this.connections.asMap()); this.connections.invalidateAll(); - resetPartitions(); + resetWriterCache(); for (StatefulRedisConnection kvRedisAsyncConnection : copy.values()) { if (kvRedisAsyncConnection.isOpen()) { kvRedisAsyncConnection.close(); @@ -131,116 +134,98 @@ public void close() { @Override public void reset() { - ImmutableMap> copy = ImmutableMap.copyOf(this.connections.asMap()); + ImmutableMap> copy = ImmutableMap.copyOf(this.connections.asMap()); for (StatefulRedisConnection kvRedisAsyncConnection : copy.values()) { kvRedisAsyncConnection.reset(); } } - private static class PoolKey { - private final ClusterConnectionProvider.Intent intent; - private SocketAddress socketAddress; - private final String host; - private final int port; - - private PoolKey(ClusterConnectionProvider.Intent intent, RedisURI uri) { - this.intent = intent; - this.host = uri.getHost(); - this.port = uri.getPort(); - this.socketAddress = uri.getResolvedAddress(); + /** + * Synchronize on {@code stateLock} to initiate a happens-before relation and clear the thread caches of other threads. + * + * @param partitions the new partitions. + */ + @Override + public void setPartitions(Partitions partitions) { + synchronized (stateLock) { + this.partitions = partitions; + reconfigurePartitions(); } + } - private PoolKey(Intent intent, String host, int port) { - this.intent = intent; - this.host = host; - this.port = port; - } + private void reconfigurePartitions() { + Set staleConnections = getStaleConnectionKeys(); - public ClusterConnectionProvider.Intent getIntent() { - return intent; - } + for (ConnectionKey key : staleConnections) { + StatefulRedisConnection connection = connections.getIfPresent(key); - public SocketAddress getSocketAddress() { + RedisChannelHandler redisChannelHandler = (RedisChannelHandler) connection; - if (socketAddress == null && LettuceStrings.isNotEmpty(host)) { - socketAddress = new InetSocketAddress(host, port); + if (redisChannelHandler.getChannelWriter() instanceof ClusterNodeCommandHandler) { + ClusterNodeCommandHandler clusterNodeCommandHandler = (ClusterNodeCommandHandler) redisChannelHandler + .getChannelWriter(); + clusterNodeCommandHandler.prepareClose(); } - - return socketAddress; } - @Override - public boolean equals(Object o) { - if (this == o) { - return true; - } - if (!(o instanceof PoolKey)) { - return false; - } - - PoolKey poolKey = (PoolKey) o; - - if (port != poolKey.port) { - return false; - } - if (host != null ? !host.equals(poolKey.host) : poolKey.host != null) { - return false; - } - if (intent != poolKey.intent) { - return false; - } + resetWriterCache(); - return true; - } - - @Override - public int hashCode() { - int result = intent != null ? intent.hashCode() : 0; - result = 31 * result + (host != null ? host.hashCode() : 0); - result = 31 * result + port; - return result; + for (ConnectionKey key : staleConnections) { + StatefulRedisConnection connection = connections.getIfPresent(key); + connection.close(); + connections.invalidate(key); } } - @Override - public void setPartitions(Partitions partitions) { - this.partitions = partitions; - resetPartitions(); - } - + /** + * Close stale connections. + */ @Override public void closeStaleConnections() { logger.debug("closeStaleConnections() count before expiring: {}", getConnectionCount()); - Map> map = Maps.newHashMap(connections.asMap()); - Set redisUris = getConnectionPointsOfAllNodes(); - for (PoolKey poolKey : map.keySet()) { - if (redisUris.contains(HostAndPort.fromParts(poolKey.host, poolKey.port))) { - continue; - } + Set stale = getStaleConnectionKeys(); - connections.invalidate(poolKey); - map.get(poolKey).close(); + for (ConnectionKey connectionKey : stale) { + StatefulRedisConnection connection = connections.getIfPresent(connectionKey); + if (connection != null) { + connections.invalidate(connectionKey); + connection.close(); + } } logger.debug("closeStaleConnections() count after expiring: {}", getConnectionCount()); } - protected Set getConnectionPointsOfAllNodes() { - Set redisUris = Sets.newHashSet(); + /** + * Retrieve a set of PoolKey's for all pooled connections that are within the pool but not within the {@link Partitions}. + * + * @return Set of {@link ConnectionKey}s + */ + private Set getStaleConnectionKeys() { + Map> map = Maps.newHashMap(connections.asMap()); + Set stale = Sets.newHashSet(); - for (RedisClusterNode partition : partitions) { - if (partition.getFlags().contains(RedisClusterNode.NodeFlag.MASTER)) { - redisUris.add(HostAndPort.fromParts(partition.getUri().getHost(), partition.getUri().getPort())); + for (ConnectionKey connectionKey : map.keySet()) { + + if (connectionKey.nodeId != null && partitions.getPartitionByNodeId(connectionKey.nodeId) != null) { + continue; } - if (partition.getFlags().contains(RedisClusterNode.NodeFlag.SLAVE)) { - redisUris.add(HostAndPort.fromParts(partition.getUri().getHost(), partition.getUri().getPort())); + if (connectionKey.host != null && getPartition(connectionKey.host, connectionKey.port) != null) { + continue; } + stale.add(connectionKey); } - return redisUris; + return stale; } + /** + * Set auto-flush on all commands. Synchronize on {@code stateLock} to initiate a happens-before relation and clear the + * thread caches of other threads. + * + * @param autoFlush state of autoFlush. + */ @Override public void setAutoFlushCommands(boolean autoFlush) { synchronized (stateLock) { @@ -260,14 +245,150 @@ public void flushCommands() { } } + /** + * + * @return number of connections. + */ protected long getConnectionCount() { return connections.size(); } - protected void resetPartitions() { - + /** + * Reset the internal writer cache. This is necessary because the {@link Partitions} have no reference to the writer cache. + * + * Synchronize on {@code stateLock} to initiate a happens-before relation and clear the thread caches of other threads. + */ + protected void resetWriterCache() { synchronized (stateLock) { Arrays.fill(writers, null); } } + + private RuntimeException invalidConnectionPoint(String message) { + return new IllegalArgumentException("Connection to " + message + + " not allowed. This connection point is not known in the cluster view"); + } + + private Supplier getSocketAddressSupplier(final ConnectionKey connectionKey) { + return new Supplier() { + @Override + public SocketAddress get() { + + if (connectionKey.nodeId != null) { + return getSocketAddress(connectionKey.nodeId); + } + return new InetSocketAddress(connectionKey.host, connectionKey.port); + } + + }; + } + + protected SocketAddress getSocketAddress(String nodeId) { + for (RedisClusterNode partition : partitions) { + if (partition.getNodeId().equals(nodeId)) { + return partition.getUri().getResolvedAddress(); + } + } + return null; + } + + /** + * Connection to identify a connection either by nodeId or host/port. + */ + private static class ConnectionKey { + private final ClusterConnectionProvider.Intent intent; + private final String nodeId; + private final String host; + private final int port; + + public ConnectionKey(Intent intent, String nodeId) { + this.intent = intent; + this.nodeId = nodeId; + this.host = null; + this.port = 0; + } + + public ConnectionKey(Intent intent, String host, int port) { + this.intent = intent; + this.host = host; + this.port = port; + this.nodeId = null; + } + + @Override + public boolean equals(Object o) { + if (this == o) + return true; + if (!(o instanceof ConnectionKey)) + return false; + + ConnectionKey key = (ConnectionKey) o; + + if (port != key.port) + return false; + if (intent != key.intent) + return false; + if (nodeId != null ? !nodeId.equals(key.nodeId) : key.nodeId != null) + return false; + return !(host != null ? !host.equals(key.host) : key.host != null); + } + + @Override + public int hashCode() { + int result = intent != null ? intent.hashCode() : 0; + result = 31 * result + (nodeId != null ? nodeId.hashCode() : 0); + result = 31 * result + (host != null ? host.hashCode() : 0); + result = 31 * result + port; + return result; + } + } + + private class ConnectionFactory extends CacheLoader> { + + private final RedisClusterClient redisClusterClient; + private final RedisCodec redisCodec; + private final RedisChannelWriter clusterWriter; + + public ConnectionFactory(RedisClusterClient redisClusterClient, RedisCodec redisCodec, + RedisChannelWriter clusterWriter) { + this.redisClusterClient = redisClusterClient; + this.redisCodec = redisCodec; + this.clusterWriter = clusterWriter; + } + + @Override + public StatefulRedisConnection load(ConnectionKey key) throws Exception { + + StatefulRedisConnection connection = null; + if (key.nodeId != null) { + if (partitions.getPartitionByNodeId(key.nodeId) == null) { + throw invalidConnectionPoint("node id " + key.nodeId); + } + + // NodeId connections provide command recovery due to cluster reconfiguration + connection = redisClusterClient.connectToNode(redisCodec, key.nodeId, clusterWriter, + getSocketAddressSupplier(key)); + } + + if (key.host != null) { + if (getPartition(key.host, key.port) == null) { + throw invalidConnectionPoint(key.host + ":" + key.port); + } + + // Host and port connections do not provide command recovery due to cluster reconfiguration + connection = redisClusterClient.connectToNode(redisCodec, key.host + ":" + key.port, null, + getSocketAddressSupplier(key)); + } + + if (key.intent == Intent.READ) { + connection.sync().readOnly(); + } + + synchronized (stateLock) { + connection.setAutoFlushCommands(autoFlushCommands); + } + + return connection; + } + } } diff --git a/src/main/java/com/lambdaworks/redis/cluster/RedisAdvancedClusterAsyncConnection.java b/src/main/java/com/lambdaworks/redis/cluster/RedisAdvancedClusterAsyncConnection.java index f1dc5f9ad1..9b0979d280 100644 --- a/src/main/java/com/lambdaworks/redis/cluster/RedisAdvancedClusterAsyncConnection.java +++ b/src/main/java/com/lambdaworks/redis/cluster/RedisAdvancedClusterAsyncConnection.java @@ -15,13 +15,16 @@ public interface RedisAdvancedClusterAsyncConnection extends RedisClusterAsyncConnection { /** - * Retrieve a connection to the specified cluster node using the nodeId. Host and port are looked up in the node list. - * + * Retrieve a connection to the specified cluster node using the nodeId. Host and port are looked up in the node list. This + * connection is bound to the node id. Once the cluster topology view is updated, the connection will try to reconnect the + * to the node with the specified {@code nodeId}, that behavior can also lead to a closed connection once the node with the + * specified {@code nodeId} is no longer part of the cluster. + * * Do not close the connections. Otherwise, unpredictable behavior will occur. The nodeId must be part of the cluster and is * validated against the current topology view in {@link com.lambdaworks.redis.cluster.models.partitions.Partitions}. - * + * * In contrast to the {@link RedisAdvancedClusterAsyncConnection}, node-connections do not route commands to other cluster - * nodes + * nodes. * * @param nodeId the node Id * @return a connection to the requested cluster node @@ -30,10 +33,14 @@ public interface RedisAdvancedClusterAsyncConnection extends RedisClusterA RedisClusterAsyncConnection getConnection(String nodeId); /** - * Retrieve a connection to the specified cluster node using the nodeId. Do not close the connections. Otherwise, - * unpredictable behavior will occur. The node must be part of the cluster and host/port are validated (exact check) against - * the current topology view in {@link com.lambdaworks.redis.cluster.models.partitions.Partitions}. + * Retrieve a connection to the specified cluster node using the nodeId. This connection is bound to a host and port. + * Updates to the cluster topology view can close the connection once the host, identified by {@code host} and {@code port}, + * are no longer part of the cluster. * + * Do not close the connections. Otherwise, unpredictable behavior will occur. The node must be part of the cluster and + * host/port are validated (exact check) against the current topology view in + * {@link com.lambdaworks.redis.cluster.models.partitions.Partitions}. + * * In contrast to the {@link RedisAdvancedClusterAsyncConnection}, node-connections do not route commands to other cluster * nodes. * diff --git a/src/main/java/com/lambdaworks/redis/cluster/RedisAdvancedClusterConnection.java b/src/main/java/com/lambdaworks/redis/cluster/RedisAdvancedClusterConnection.java index 58ff6adad8..1edc10ec56 100644 --- a/src/main/java/com/lambdaworks/redis/cluster/RedisAdvancedClusterConnection.java +++ b/src/main/java/com/lambdaworks/redis/cluster/RedisAdvancedClusterConnection.java @@ -15,13 +15,16 @@ public interface RedisAdvancedClusterConnection extends RedisClusterConnection { /** - * Retrieve a connection to the specified cluster node using the nodeId. Host and port are looked up in the node list. + * Retrieve a connection to the specified cluster node using the nodeId. Host and port are looked up in the node list. This + * connection is bound to the node id. Once the cluster topology view is updated, the connection will try to reconnect the + * to the node with the specified {@code nodeId}, that behavior can also lead to a closed connection once the node with the + * specified {@code nodeId} is no longer part of the cluster. * * Do not close the connections. Otherwise, unpredictable behavior will occur. The nodeId must be part of the cluster and is * validated against the current topology view in {@link com.lambdaworks.redis.cluster.models.partitions.Partitions}. * * In contrast to the {@link RedisAdvancedClusterConnection}, node-connections do not route commands to other cluster nodes. - * + * * @param nodeId the node Id * @return a connection to the requested cluster node * @throws RedisException if the requested node identified by {@code nodeId} is not part of the cluster @@ -29,9 +32,13 @@ public interface RedisAdvancedClusterConnection extends RedisClusterConnec RedisClusterConnection getConnection(String nodeId); /** - * Retrieve a connection to the specified cluster node using the nodeId. Do not close the connections. Otherwise, - * unpredictable behavior will occur. The node must be part of the cluster and host/port are validated (exact check) against - * the current topology view in {@link com.lambdaworks.redis.cluster.models.partitions.Partitions}. + * Retrieve a connection to the specified cluster node using the nodeId. This connection is bound to a host and port. + * Updates to the cluster topology view can close the connection once the host, identified by {@code host} and {@code port}, + * are no longer part of the cluster. + * + * Do not close the connections. Otherwise, unpredictable behavior will occur. The node must be part of the cluster and + * host/port are validated (exact check) against the current topology view in + * {@link com.lambdaworks.redis.cluster.models.partitions.Partitions}. * * In contrast to the {@link RedisAdvancedClusterConnection}, node-connections do not route commands to other cluster nodes. * diff --git a/src/main/java/com/lambdaworks/redis/cluster/RedisClusterClient.java b/src/main/java/com/lambdaworks/redis/cluster/RedisClusterClient.java index c91f9ef9bf..0cf9cacac6 100644 --- a/src/main/java/com/lambdaworks/redis/cluster/RedisClusterClient.java +++ b/src/main/java/com/lambdaworks/redis/cluster/RedisClusterClient.java @@ -11,7 +11,6 @@ import java.util.List; import java.util.Map; import java.util.Queue; -import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Consumer; import java.util.function.Predicate; @@ -200,28 +199,37 @@ public RedisAdvancedClusterAsyncCommands connectClusterAsync(RedisC return connectClusterImpl(codec, getSocketAddressSupplier()).async(); } - protected StatefulRedisConnection connectToNode(SocketAddress socketAddress) { - return connectToNode(newStringStringCodec(), socketAddress); + protected StatefulRedisConnection connectToNode(final SocketAddress socketAddress) { + return connectToNode(newStringStringCodec(), socketAddress.toString(), null, new Supplier() { + @Override + public SocketAddress get() { + return socketAddress; + } + }); } /** * Create a connection to a redis socket address. + * + * @param codec Use this codec to encode/decode keys and values. + * @param nodeId the nodeId + * @param clusterWriter global cluster writer + * @param socketAddressSupplier supplier for the socket address * - * @param socketAddress initial connect * @param Key type. * @param Value type. * @return a new connection */ - StatefulRedisConnection connectToNode(RedisCodec codec, final SocketAddress socketAddress) { - - logger.debug("connectAsyncImpl(" + socketAddress + ")"); - Queue> queue = new ArrayDeque>(); + StatefulRedisConnection connectToNode(RedisCodec codec, String nodeId, + RedisChannelWriter clusterWriter, final Supplier socketAddressSupplier) { - CommandHandler handler = new CommandHandler(clientOptions, queue); + logger.debug("connectNode(" + nodeId + ")"); + Queue> queue = new ArrayDeque<>(); + ClusterNodeCommandHandler handler = new ClusterNodeCommandHandler(clientOptions, queue, clusterWriter); StatefulRedisConnectionImpl connection = new StatefulRedisConnectionImpl(handler, codec, timeout, unit); - connectAsyncImpl(handler, connection, () -> socketAddress); + connectAsyncImpl(handler, connection, socketAddressSupplier); connection.registerCloseables(closeableResources, connection); @@ -254,12 +262,13 @@ StatefulRedisClusterConnectionImpl connectClusterImpl(RedisCodec handler = new CommandHandler(clientOptions, queue); - final PooledClusterConnectionProvider pooledClusterConnectionProvider = new PooledClusterConnectionProvider( - this, codec); + ClusterDistributionChannelWriter clusterWriter = new ClusterDistributionChannelWriter(handler); + PooledClusterConnectionProvider pooledClusterConnectionProvider = new PooledClusterConnectionProvider(this, + clusterWriter, codec); + + clusterWriter.setClusterConnectionProvider(pooledClusterConnectionProvider); - final ClusterDistributionChannelWriter clusterWriter = new ClusterDistributionChannelWriter(handler, - pooledClusterConnectionProvider); - StatefulRedisClusterConnectionImpl connection = new StatefulRedisClusterConnectionImpl<>(clusterWriter, codec, + StatefulRedisClusterConnectionImpl connection = new StatefulRedisClusterConnectionImpl(clusterWriter, codec, timeout, unit); connection.setPartitions(partitions); @@ -392,6 +401,11 @@ protected Utf8StringCodec newStringStringCodec() { return new Utf8StringCodec(); } + /** + * Sets the new cluster topology. The partitions are not applied to existing connections. + * + * @param partitions partitions object + */ public void setPartitions(Partitions partitions) { this.partitions = partitions; } diff --git a/src/main/java/com/lambdaworks/redis/cluster/api/StatefulRedisClusterConnection.java b/src/main/java/com/lambdaworks/redis/cluster/api/StatefulRedisClusterConnection.java index 08b19751cf..028965f2b1 100644 --- a/src/main/java/com/lambdaworks/redis/cluster/api/StatefulRedisClusterConnection.java +++ b/src/main/java/com/lambdaworks/redis/cluster/api/StatefulRedisClusterConnection.java @@ -3,8 +3,6 @@ import com.lambdaworks.redis.RedisException; import com.lambdaworks.redis.api.StatefulConnection; import com.lambdaworks.redis.api.StatefulRedisConnection; -import com.lambdaworks.redis.cluster.RedisAdvancedClusterAsyncConnection; -import com.lambdaworks.redis.cluster.RedisAdvancedClusterConnection; import com.lambdaworks.redis.cluster.api.async.RedisAdvancedClusterAsyncCommands; import com.lambdaworks.redis.cluster.api.rx.RedisAdvancedClusterReactiveCommands; import com.lambdaworks.redis.cluster.api.sync.RedisAdvancedClusterCommands; @@ -44,13 +42,16 @@ public interface StatefulRedisClusterConnection extends StatefulConnection RedisAdvancedClusterReactiveCommands reactive(); /** - * Retrieve a connection to the specified cluster node using the nodeId. Host and port are looked up in the node list. + * Retrieve a connection to the specified cluster node using the nodeId. Host and port are looked up in the node list. This + * connection is bound to the node id. Once the cluster topology view is updated, the connection will try to reconnect the + * to the node with the specified {@code nodeId}, that behavior can also lead to a closed connection once the node with the + * specified {@code nodeId} is no longer part of the cluster. * * Do not close the connections. Otherwise, unpredictable behavior will occur. The nodeId must be part of the cluster and is * validated against the current topology view in {@link com.lambdaworks.redis.cluster.models.partitions.Partitions}. + * * - * In contrast to the {@link StatefulRedisClusterConnection}, node-connections do not route commands to other cluster - * nodes. + * In contrast to the {@link StatefulRedisClusterConnection}, node-connections do not route commands to other cluster nodes. * * @param nodeId the node Id * @return a connection to the requested cluster node @@ -59,12 +60,15 @@ public interface StatefulRedisClusterConnection extends StatefulConnection StatefulRedisConnection getConnection(String nodeId); /** - * Retrieve a connection to the specified cluster node using the nodeId. Do not close the connections. Otherwise, - * unpredictable behavior will occur. The node must be part of the cluster and host/port are validated (exact check) against - * the current topology view in {@link com.lambdaworks.redis.cluster.models.partitions.Partitions}. + * Retrieve a connection to the specified cluster node using the nodeId. This connection is bound to a host and port. + * Updates to the cluster topology view can close the connection once the host, identified by {@code host} and {@code port}, + * are no longer part of the cluster. * - * In contrast to the {@link StatefulRedisClusterConnection}, node-connections do not route commands to other cluster - * nodes. + * Do not close the connections. Otherwise, unpredictable behavior will occur. The node must be part of the cluster and + * host/port are validated (exact check) against the current topology view in + * {@link com.lambdaworks.redis.cluster.models.partitions.Partitions}. + * + * In contrast to the {@link StatefulRedisClusterConnection}, node-connections do not route commands to other cluster nodes. * * @param host the host * @param port the port diff --git a/src/main/java/com/lambdaworks/redis/protocol/CommandHandler.java b/src/main/java/com/lambdaworks/redis/protocol/CommandHandler.java index a88af243e0..065ccd0753 100644 --- a/src/main/java/com/lambdaworks/redis/protocol/CommandHandler.java +++ b/src/main/java/com/lambdaworks/redis/protocol/CommandHandler.java @@ -48,10 +48,10 @@ public class CommandHandler extends ChannelDuplexHandler implements RedisC protected Queue> commandBuffer = new ArrayDeque>(); protected ByteBuf buffer; protected RedisStateMachine rsm; + protected Channel channel; private LifecycleState lifecycleState = LifecycleState.NOT_CONNECTED; private Object stateLock = new Object(); - private Channel channel; /** * If TRACE level logging has been enabled at startup. @@ -389,6 +389,10 @@ protected void setState(LifecycleState lifecycleState) { } } + protected LifecycleState getState() { + return lifecycleState; + } + private void cancelCommands(String message) { int size = 0; if (queue != null) { @@ -511,7 +515,7 @@ public void setAutoFlushCommands(boolean autoFlush) { } } - private String logPrefix() { + protected String logPrefix() { if (logPrefix != null) { return logPrefix; } @@ -520,9 +524,7 @@ private String logPrefix() { return logPrefix = buffer.toString(); } - @VisibleForTesting - enum LifecycleState { - + public enum LifecycleState { NOT_CONNECTED, REGISTERED, CONNECTED, ACTIVATING, ACTIVE, DISCONNECTED, DEACTIVATING, DEACTIVATED, CLOSED, }