diff --git a/src/main/java/io/lettuce/core/cluster/ClusterPubSubConnectionProvider.java b/src/main/java/io/lettuce/core/cluster/ClusterPubSubConnectionProvider.java index 58e6a02449..7176f6de85 100644 --- a/src/main/java/io/lettuce/core/cluster/ClusterPubSubConnectionProvider.java +++ b/src/main/java/io/lettuce/core/cluster/ClusterPubSubConnectionProvider.java @@ -175,7 +175,7 @@ public void punsubscribed(K pattern, long count) { } private RedisClusterNode getNode() { - return nodeId != null ? getPartitions().getPartitionByNodeId(nodeId) : getPartition(host, port); + return nodeId != null ? getPartitions().getPartitionByNodeId(nodeId) : getPartitions().getPartition(host, port); } } } diff --git a/src/main/java/io/lettuce/core/cluster/PooledClusterConnectionProvider.java b/src/main/java/io/lettuce/core/cluster/PooledClusterConnectionProvider.java index 942b007afa..c67f293fcc 100644 --- a/src/main/java/io/lettuce/core/cluster/PooledClusterConnectionProvider.java +++ b/src/main/java/io/lettuce/core/cluster/PooledClusterConnectionProvider.java @@ -379,7 +379,7 @@ private void beforeGetConnection(Intent intent, String host, int port) { } if (validateClusterNodeMembership()) { - RedisClusterNode redisClusterNode = getPartition(host, port); + RedisClusterNode redisClusterNode = partitions.getPartition(host, port); if (redisClusterNode == null) { HostAndPort hostAndPort = HostAndPort.of(host, port); @@ -388,17 +388,6 @@ private void beforeGetConnection(Intent intent, String host, int port) { } } - protected RedisClusterNode getPartition(String host, int port) { - - for (RedisClusterNode partition : partitions) { - RedisURI uri = partition.getUri(); - if (port == uri.getPort() && host.equals(uri.getHost())) { - return partition; - } - } - return null; - } - @Override public void close() { closeAsync().join(); @@ -476,7 +465,7 @@ private boolean isStale(ConnectionKey connectionKey) { return false; } - if (connectionKey.host != null && getPartition(connectionKey.host, connectionKey.port) != null) { + if (connectionKey.host != null && partitions.getPartition(connectionKey.host, connectionKey.port) != null) { return false; } @@ -581,7 +570,7 @@ public ConnectionFuture> apply(ConnectionKey key) if (key.host != null) { if (validateClusterNodeMembership()) { - if (getPartition(key.host, key.port) == null) { + if (partitions.getPartition(key.host, key.port) == null) { throw invalidConnectionPoint(key.host + ":" + key.port); } } diff --git a/src/main/java/io/lettuce/core/cluster/models/partitions/Partitions.java b/src/main/java/io/lettuce/core/cluster/models/partitions/Partitions.java index 62a28d661c..842800297f 100644 --- a/src/main/java/io/lettuce/core/cluster/models/partitions/Partitions.java +++ b/src/main/java/io/lettuce/core/cluster/models/partitions/Partitions.java @@ -17,6 +17,7 @@ import java.util.*; +import io.lettuce.core.RedisURI; import io.lettuce.core.cluster.SlotHash; import io.lettuce.core.internal.LettuceAssert; @@ -61,8 +62,8 @@ public class Partitions implements Collection { /** * Retrieve a {@link RedisClusterNode} by its slot number. This method does not distinguish between masters and slaves. * - * @param slot the slot - * @return RedisClusterNode or {@literal null} + * @param slot the slot hash. + * @return the {@link RedisClusterNode} or {@literal null} if not found. */ public RedisClusterNode getPartitionBySlot(int slot) { return slotCache[slot]; @@ -71,8 +72,8 @@ public RedisClusterNode getPartitionBySlot(int slot) { /** * Retrieve a {@link RedisClusterNode} by its node id. * - * @param nodeId the nodeId - * @return RedisClusterNode or {@literal null} + * @param nodeId the nodeId. + * @return the {@link RedisClusterNode} or {@literal null} if not found. */ public RedisClusterNode getPartitionByNodeId(String nodeId) { @@ -81,9 +82,42 @@ public RedisClusterNode getPartitionByNodeId(String nodeId) { return partition; } } + return null; } + /** + * Retrieve a {@link RedisClusterNode} by its hostname/port considering node aliases. + * + * @param host hostname. + * @param port port number. + * @return the {@link RedisClusterNode} or {@literal null} if not found. + */ + public RedisClusterNode getPartition(String host, int port) { + + for (RedisClusterNode partition : nodeReadView) { + + RedisURI uri = partition.getUri(); + + if (matches(uri, host, port)) { + return partition; + } + + for (RedisURI redisURI : partition.getAliases()) { + + if (matches(redisURI, host, port)) { + return partition; + } + } + } + + return null; + } + + private static boolean matches(RedisURI uri, String host, int port) { + return uri.getPort() == port && host.equals(uri.getHost()); + } + /** * Update the partition cache. Updates are necessary after the partition details have changed. */ diff --git a/src/main/java/io/lettuce/core/cluster/models/partitions/RedisClusterNode.java b/src/main/java/io/lettuce/core/cluster/models/partitions/RedisClusterNode.java index e6e05d331b..a3c719bf87 100644 --- a/src/main/java/io/lettuce/core/cluster/models/partitions/RedisClusterNode.java +++ b/src/main/java/io/lettuce/core/cluster/models/partitions/RedisClusterNode.java @@ -49,6 +49,7 @@ public class RedisClusterNode implements Serializable, RedisNodeDescription { private BitSet slots; private final Set flags = EnumSet.noneOf(NodeFlag.class); + private final List aliases = new ArrayList<>(); public RedisClusterNode() { } @@ -79,6 +80,7 @@ public RedisClusterNode(RedisClusterNode redisClusterNode) { this.pingSentTimestamp = redisClusterNode.pingSentTimestamp; this.pongReceivedTimestamp = redisClusterNode.pongReceivedTimestamp; this.configEpoch = redisClusterNode.configEpoch; + this.aliases.addAll(redisClusterNode.aliases); if (redisClusterNode.slots != null && !redisClusterNode.slots.isEmpty()) { this.slots = new BitSet(SlotHash.SLOT_COUNT); @@ -271,6 +273,21 @@ public boolean is(NodeFlag nodeFlag) { return getFlags().contains(nodeFlag); } + /** + * Add an alias to {@link RedisClusterNode}. + * + * @param alias must not be {@literal null}. + */ + public void addAlias(RedisURI alias) { + + LettuceAssert.notNull(alias, "Alias URI must not be null"); + this.aliases.add(alias); + } + + public List getAliases() { + return aliases; + } + /** * @param slot the slot hash * @return true if the slot is contained within the handled slots. @@ -324,6 +341,7 @@ public String toString() { sb.append(", pongReceivedTimestamp=").append(pongReceivedTimestamp); sb.append(", configEpoch=").append(configEpoch); sb.append(", flags=").append(flags); + sb.append(", aliases=").append(aliases); if (slots != null) { sb.append(", slot count=").append(slots.cardinality()); } diff --git a/src/main/java/io/lettuce/core/cluster/topology/ClusterTopologyRefresh.java b/src/main/java/io/lettuce/core/cluster/topology/ClusterTopologyRefresh.java index 3c64b88490..79fd7d8029 100644 --- a/src/main/java/io/lettuce/core/cluster/topology/ClusterTopologyRefresh.java +++ b/src/main/java/io/lettuce/core/cluster/topology/ClusterTopologyRefresh.java @@ -122,15 +122,22 @@ NodeTopologyViews getNodeSpecificViews(Requests requestedTopology, Requests requ Set nodes = requestedTopology.nodes(); List views = new ArrayList<>(); - for (RedisURI node : nodes) { + for (RedisURI nodeUri : nodes) { try { - NodeTopologyView nodeTopologyView = NodeTopologyView.from(node, requestedTopology, requestedClients); + NodeTopologyView nodeTopologyView = NodeTopologyView.from(nodeUri, requestedTopology, requestedClients); if (!nodeTopologyView.isAvailable()) { continue; } + RedisClusterNode node = nodeTopologyView.getOwnPartition(); + if (node.getUri() == null) { + node.setUri(nodeUri); + } else { + node.addAlias(nodeUri); + } + List nodeWithStats = nodeTopologyView.getPartitions() // .stream() // .filter(ClusterTopologyRefresh::validNode) // @@ -140,10 +147,6 @@ NodeTopologyViews getNodeSpecificViews(Requests requestedTopology, Requests requ .filter(partition -> partition.is(RedisClusterNode.NodeFlag.MYSELF)) // .forEach(partition -> { - if (partition.getUri() == null) { - partition.setUri(node); - } - // record latency for later partition ordering latencies.put(partition.getNodeId(), nodeTopologyView.getLatency()); clientCountByNodeId.put(partition.getNodeId(), nodeTopologyView.getConnectedClients()); @@ -158,7 +161,7 @@ NodeTopologyViews getNodeSpecificViews(Requests requestedTopology, Requests requ views.add(nodeTopologyView); } catch (ExecutionException e) { - logger.warn(String.format("Cannot retrieve partition view from %s, error: %s", node, e)); + logger.warn(String.format("Cannot retrieve partition view from %s, error: %s", nodeUri, e)); } } diff --git a/src/main/java/io/lettuce/core/cluster/topology/NodeTopologyView.java b/src/main/java/io/lettuce/core/cluster/topology/NodeTopologyView.java index 48a7d69ca3..b59096de74 100644 --- a/src/main/java/io/lettuce/core/cluster/topology/NodeTopologyView.java +++ b/src/main/java/io/lettuce/core/cluster/topology/NodeTopologyView.java @@ -57,12 +57,9 @@ class NodeTopologyView { this.partitions = ClusterPartitionParser.parse(clusterNodes); this.connectedClients = getClients(clientList); - this.clusterNodes = clusterNodes; this.clientList = clientList; this.latency = latency; - - getOwnPartition().setUri(redisURI); } static NodeTopologyView from(RedisURI redisURI, Requests clusterNodesRequests, Requests clientListRequests) @@ -111,10 +108,15 @@ String getNodeId() { } RedisURI getRedisURI() { + + if (partitions.isEmpty()) { + return redisURI; + } + return getOwnPartition().getUri(); } - private RedisClusterNode getOwnPartition() { + RedisClusterNode getOwnPartition() { for (RedisClusterNode partition : partitions) { if (partition.is(RedisClusterNode.NodeFlag.MYSELF)) { return partition; diff --git a/src/main/java/io/lettuce/core/cluster/topology/NodeTopologyViews.java b/src/main/java/io/lettuce/core/cluster/topology/NodeTopologyViews.java index 1d42675c46..c65004fa3d 100644 --- a/src/main/java/io/lettuce/core/cluster/topology/NodeTopologyViews.java +++ b/src/main/java/io/lettuce/core/cluster/topology/NodeTopologyViews.java @@ -26,7 +26,7 @@ */ class NodeTopologyViews { - private List views = new ArrayList<>(); + private List views; public NodeTopologyViews(List views) { this.views = views; diff --git a/src/test/java/io/lettuce/core/cluster/models/partitions/PartitionsTest.java b/src/test/java/io/lettuce/core/cluster/models/partitions/PartitionsTest.java index 8deeb3128d..af07a6b42a 100644 --- a/src/test/java/io/lettuce/core/cluster/models/partitions/PartitionsTest.java +++ b/src/test/java/io/lettuce/core/cluster/models/partitions/PartitionsTest.java @@ -36,7 +36,7 @@ public class PartitionsTest { Arrays.asList(4, 5, 6), new HashSet<>()); @Test - public void contains() throws Exception { + public void contains() { Partitions partitions = new Partitions(); partitions.add(node1); @@ -46,7 +46,7 @@ public void contains() throws Exception { } @Test - public void containsUsesReadView() throws Exception { + public void containsUsesReadView() { Partitions partitions = new Partitions(); partitions.getPartitions().add(node1); @@ -57,7 +57,7 @@ public void containsUsesReadView() throws Exception { } @Test - public void containsAll() throws Exception { + public void containsAll() { Partitions partitions = new Partitions(); partitions.add(node1); @@ -67,7 +67,7 @@ public void containsAll() throws Exception { } @Test - public void containsAllUsesReadView() throws Exception { + public void containsAllUsesReadView() { Partitions partitions = new Partitions(); partitions.getPartitions().add(node1); @@ -78,7 +78,7 @@ public void containsAllUsesReadView() throws Exception { } @Test - public void add() throws Exception { + public void add() { Partitions partitions = new Partitions(); partitions.add(node1); @@ -89,7 +89,7 @@ public void add() throws Exception { } @Test - public void addPartitionClearsCache() throws Exception { + public void addPartitionClearsCache() { Partitions partitions = new Partitions(); partitions.addPartition(node1); @@ -98,7 +98,7 @@ public void addPartitionClearsCache() throws Exception { } @Test - public void addAll() throws Exception { + public void addAll() { Partitions partitions = new Partitions(); partitions.addAll(Arrays.asList(node1, node2)); @@ -108,7 +108,7 @@ public void addAll() throws Exception { } @Test - public void getPartitionBySlot() throws Exception { + public void getPartitionBySlot() { Partitions partitions = new Partitions(); @@ -119,7 +119,19 @@ public void getPartitionBySlot() throws Exception { } @Test - public void remove() throws Exception { + public void getPartitionByAlias() { + + Partitions partitions = new Partitions(); + node1.addAlias(RedisURI.create("foobar", 1234)); + partitions.add(node1); + + assertThat(partitions.getPartition(node1.getUri().getHost(), node1.getUri().getPort())).isEqualTo(node1); + assertThat(partitions.getPartition("foobar", 1234)).isEqualTo(node1); + assertThat(partitions.getPartition("unknown", 1234)).isNull(); + } + + @Test + public void remove() { Partitions partitions = new Partitions(); partitions.addAll(Arrays.asList(node1, node2)); @@ -130,7 +142,7 @@ public void remove() throws Exception { } @Test - public void removeAll() throws Exception { + public void removeAll() { Partitions partitions = new Partitions(); partitions.addAll(Arrays.asList(node1, node2)); @@ -141,7 +153,7 @@ public void removeAll() throws Exception { } @Test - public void clear() throws Exception { + public void clear() { Partitions partitions = new Partitions(); partitions.addAll(Arrays.asList(node1, node2)); @@ -152,7 +164,7 @@ public void clear() throws Exception { } @Test - public void retainAll() throws Exception { + public void retainAll() { Partitions partitions = new Partitions(); partitions.addAll(Arrays.asList(node1, node2)); @@ -163,7 +175,7 @@ public void retainAll() throws Exception { } @Test - public void toArray() throws Exception { + public void toArray() { Partitions partitions = new Partitions(); partitions.addAll(Arrays.asList(node1, node2)); @@ -172,7 +184,7 @@ public void toArray() throws Exception { } @Test - public void toArrayUsesReadView() throws Exception { + public void toArrayUsesReadView() { Partitions partitions = new Partitions(); partitions.getPartitions().addAll(Arrays.asList(node1, node2)); @@ -183,7 +195,7 @@ public void toArrayUsesReadView() throws Exception { } @Test - public void toArray2() throws Exception { + public void toArray2() { Partitions partitions = new Partitions(); partitions.addAll(Arrays.asList(node1, node2)); @@ -192,7 +204,7 @@ public void toArray2() throws Exception { } @Test - public void toArray2UsesReadView() throws Exception { + public void toArray2UsesReadView() { Partitions partitions = new Partitions(); partitions.getPartitions().addAll(Arrays.asList(node1, node2)); @@ -205,7 +217,7 @@ public void toArray2UsesReadView() throws Exception { } @Test - public void getPartitionByNodeId() throws Exception { + public void getPartitionByNodeId() { Partitions partitions = new Partitions(); partitions.add(node1); @@ -216,7 +228,7 @@ public void getPartitionByNodeId() throws Exception { } @Test - public void reload() throws Exception { + public void reload() { RedisClusterNode other = new RedisClusterNode(RedisURI.create("localhost", 6666), "c", true, "", 0, 0, 0, Arrays.asList(1, 2, 3, 4, 5, 6), new HashSet<>()); @@ -231,7 +243,7 @@ public void reload() throws Exception { } @Test - public void reloadEmpty() throws Exception { + public void reloadEmpty() { Partitions partitions = new Partitions(); partitions.reload(Arrays.asList()); @@ -240,7 +252,7 @@ public void reloadEmpty() throws Exception { } @Test - public void isEmpty() throws Exception { + public void isEmpty() { Partitions partitions = new Partitions(); partitions.add(node1); @@ -249,7 +261,7 @@ public void isEmpty() throws Exception { } @Test - public void size() throws Exception { + public void size() { Partitions partitions = new Partitions(); partitions.add(node1); @@ -258,7 +270,7 @@ public void size() throws Exception { } @Test - public void sizeUsesReadView() throws Exception { + public void sizeUsesReadView() { Partitions partitions = new Partitions(); partitions.getPartitions().add(node1); @@ -271,7 +283,7 @@ public void sizeUsesReadView() throws Exception { } @Test - public void getPartition() throws Exception { + public void getPartition() { Partitions partitions = new Partitions(); partitions.add(node1); @@ -280,7 +292,7 @@ public void getPartition() throws Exception { } @Test - public void iterator() throws Exception { + public void iterator() { Partitions partitions = new Partitions(); partitions.add(node1); @@ -289,7 +301,7 @@ public void iterator() throws Exception { } @Test - public void iteratorUsesReadView() throws Exception { + public void iteratorUsesReadView() { Partitions partitions = new Partitions(); partitions.getPartitions().add(node1); @@ -301,7 +313,7 @@ public void iteratorUsesReadView() throws Exception { } @Test - public void iteratorIsSafeDuringUpdate() throws Exception { + public void iteratorIsSafeDuringUpdate() { Partitions partitions = new Partitions(); partitions.add(node1); @@ -325,7 +337,7 @@ public void iteratorIsSafeDuringUpdate() throws Exception { } @Test - public void testToString() throws Exception { + public void testToString() { Partitions partitions = new Partitions(); partitions.add(node1); diff --git a/src/test/java/io/lettuce/core/cluster/models/partitions/RedisClusterNodeTest.java b/src/test/java/io/lettuce/core/cluster/models/partitions/RedisClusterNodeTest.java index 05f121b589..19f0c8bcdb 100644 --- a/src/test/java/io/lettuce/core/cluster/models/partitions/RedisClusterNodeTest.java +++ b/src/test/java/io/lettuce/core/cluster/models/partitions/RedisClusterNodeTest.java @@ -34,12 +34,14 @@ public void shouldCopyNode() { RedisClusterNode node = new RedisClusterNode(); node.setSlots(Arrays.asList(1, 2, 3, SlotHash.SLOT_COUNT - 1)); + node.addAlias(RedisURI.create("foo", 6379)); RedisClusterNode copy = new RedisClusterNode(node); assertThat(copy.getSlots()).containsExactly(1, 2, 3, SlotHash.SLOT_COUNT - 1); assertThat(copy.hasSlot(1)).isTrue(); assertThat(copy.hasSlot(SlotHash.SLOT_COUNT - 1)).isTrue(); + assertThat(copy.getAliases()).contains(RedisURI.create("foo", 6379)); } @Test diff --git a/src/test/java/io/lettuce/core/cluster/topology/NodeTopologyViewsTest.java b/src/test/java/io/lettuce/core/cluster/topology/NodeTopologyViewsTest.java index c4fa57bac4..bf7b8bcf48 100644 --- a/src/test/java/io/lettuce/core/cluster/topology/NodeTopologyViewsTest.java +++ b/src/test/java/io/lettuce/core/cluster/topology/NodeTopologyViewsTest.java @@ -30,9 +30,9 @@ public class NodeTopologyViewsTest { @Test - public void shouldReuseKnownUris() throws Exception { + public void shouldReuseKnownUris() { - RedisURI localhost = RedisURI.create("localhost", 6479); + RedisURI localhost = RedisURI.create("127.0.0.1", 6479); RedisURI otherhost = RedisURI.create("127.0.0.2", 7000); RedisURI host3 = RedisURI.create("127.0.0.3", 7000); @@ -41,7 +41,7 @@ public void shouldReuseKnownUris() throws Exception { + "2 127.0.0.2:7000 master - 111 1401258245007 222 connected 7000 12000 12002-16383\n" + "3 127.0.0.3:7000 master - 111 1401258245007 222 connected 7000 12000 12002-16383\n"; - String viewByOtherhost = "1 127.0.0.1:6479 master - 0 1401258245007 2 connected 8000-11999\n" + String viewByOtherhost = "1 127.0.0.2:6479 master - 0 1401258245007 2 connected 8000-11999\n" + "2 127.0.0.2:7000 master,myself - 111 1401258245007 222 connected 7000 12000 12002-16383\n" + "3 127.0.0.3:7000 master - 111 1401258245007 222 connected 7000 12000 12002-16383\n"; @@ -55,12 +55,12 @@ public void shouldReuseKnownUris() throws Exception { } @Test(expected = IllegalStateException.class) - public void shouldFailWithoutOwnPartition() throws Exception { + public void shouldFailWithoutOwnPartition() { - RedisURI localhost = RedisURI.create("localhost", 6479); + RedisURI localhost = RedisURI.create("127.0.0.1", 6479); String viewByLocalhost = "1 127.0.0.1:6479 master - 0 1401258245007 2 connected 8000-11999\n"; - new NodeTopologyView(localhost, viewByLocalhost, "", 0); + new NodeTopologyView(localhost, viewByLocalhost, "", 0).getOwnPartition(); } }