From d3b707070acfd9df17e3c58e87538ad4f4795fdb Mon Sep 17 00:00:00 2001 From: Mark Paluch Date: Mon, 19 Mar 2018 17:10:04 +0100 Subject: [PATCH] Add alias to RedisClusterNode #712 Lettuce now provides a cluster node alias to better cope with load balancers or alternate cluster names. We are using the Redis reported hostname as URI (was previously the seed node URI) and add the seed node URI as alias. This way we make sure to connect the appropriate host directly without connecting the balancer and receiving redirects. Redirects can happen in an environment where the load balancer forwards connections to random cluster master nodes. --- .../ClusterPubSubConnectionProvider.java | 2 +- .../PooledClusterConnectionProvider.java | 17 +---- .../cluster/models/partitions/Partitions.java | 42 ++++++++++-- .../models/partitions/RedisClusterNode.java | 18 +++++ .../topology/ClusterTopologyRefresh.java | 17 +++-- .../cluster/topology/NodeTopologyView.java | 10 +-- .../cluster/topology/NodeTopologyViews.java | 2 +- .../models/partitions/PartitionsTest.java | 66 +++++++++++-------- .../partitions/RedisClusterNodeTest.java | 2 + .../topology/NodeTopologyViewsTest.java | 12 ++-- 10 files changed, 124 insertions(+), 64 deletions(-) diff --git a/src/main/java/io/lettuce/core/cluster/ClusterPubSubConnectionProvider.java b/src/main/java/io/lettuce/core/cluster/ClusterPubSubConnectionProvider.java index 58e6a02449..7176f6de85 100644 --- a/src/main/java/io/lettuce/core/cluster/ClusterPubSubConnectionProvider.java +++ b/src/main/java/io/lettuce/core/cluster/ClusterPubSubConnectionProvider.java @@ -175,7 +175,7 @@ public void punsubscribed(K pattern, long count) { } private RedisClusterNode getNode() { - return nodeId != null ? getPartitions().getPartitionByNodeId(nodeId) : getPartition(host, port); + return nodeId != null ? getPartitions().getPartitionByNodeId(nodeId) : getPartitions().getPartition(host, port); } } } diff --git a/src/main/java/io/lettuce/core/cluster/PooledClusterConnectionProvider.java b/src/main/java/io/lettuce/core/cluster/PooledClusterConnectionProvider.java index 942b007afa..c67f293fcc 100644 --- a/src/main/java/io/lettuce/core/cluster/PooledClusterConnectionProvider.java +++ b/src/main/java/io/lettuce/core/cluster/PooledClusterConnectionProvider.java @@ -379,7 +379,7 @@ private void beforeGetConnection(Intent intent, String host, int port) { } if (validateClusterNodeMembership()) { - RedisClusterNode redisClusterNode = getPartition(host, port); + RedisClusterNode redisClusterNode = partitions.getPartition(host, port); if (redisClusterNode == null) { HostAndPort hostAndPort = HostAndPort.of(host, port); @@ -388,17 +388,6 @@ private void beforeGetConnection(Intent intent, String host, int port) { } } - protected RedisClusterNode getPartition(String host, int port) { - - for (RedisClusterNode partition : partitions) { - RedisURI uri = partition.getUri(); - if (port == uri.getPort() && host.equals(uri.getHost())) { - return partition; - } - } - return null; - } - @Override public void close() { closeAsync().join(); @@ -476,7 +465,7 @@ private boolean isStale(ConnectionKey connectionKey) { return false; } - if (connectionKey.host != null && getPartition(connectionKey.host, connectionKey.port) != null) { + if (connectionKey.host != null && partitions.getPartition(connectionKey.host, connectionKey.port) != null) { return false; } @@ -581,7 +570,7 @@ public ConnectionFuture> apply(ConnectionKey key) if (key.host != null) { if (validateClusterNodeMembership()) { - if (getPartition(key.host, key.port) == null) { + if (partitions.getPartition(key.host, key.port) == null) { throw invalidConnectionPoint(key.host + ":" + key.port); } } diff --git a/src/main/java/io/lettuce/core/cluster/models/partitions/Partitions.java b/src/main/java/io/lettuce/core/cluster/models/partitions/Partitions.java index 62a28d661c..842800297f 100644 --- a/src/main/java/io/lettuce/core/cluster/models/partitions/Partitions.java +++ b/src/main/java/io/lettuce/core/cluster/models/partitions/Partitions.java @@ -17,6 +17,7 @@ import java.util.*; +import io.lettuce.core.RedisURI; import io.lettuce.core.cluster.SlotHash; import io.lettuce.core.internal.LettuceAssert; @@ -61,8 +62,8 @@ public class Partitions implements Collection { /** * Retrieve a {@link RedisClusterNode} by its slot number. This method does not distinguish between masters and slaves. * - * @param slot the slot - * @return RedisClusterNode or {@literal null} + * @param slot the slot hash. + * @return the {@link RedisClusterNode} or {@literal null} if not found. */ public RedisClusterNode getPartitionBySlot(int slot) { return slotCache[slot]; @@ -71,8 +72,8 @@ public RedisClusterNode getPartitionBySlot(int slot) { /** * Retrieve a {@link RedisClusterNode} by its node id. * - * @param nodeId the nodeId - * @return RedisClusterNode or {@literal null} + * @param nodeId the nodeId. + * @return the {@link RedisClusterNode} or {@literal null} if not found. */ public RedisClusterNode getPartitionByNodeId(String nodeId) { @@ -81,9 +82,42 @@ public RedisClusterNode getPartitionByNodeId(String nodeId) { return partition; } } + return null; } + /** + * Retrieve a {@link RedisClusterNode} by its hostname/port considering node aliases. + * + * @param host hostname. + * @param port port number. + * @return the {@link RedisClusterNode} or {@literal null} if not found. + */ + public RedisClusterNode getPartition(String host, int port) { + + for (RedisClusterNode partition : nodeReadView) { + + RedisURI uri = partition.getUri(); + + if (matches(uri, host, port)) { + return partition; + } + + for (RedisURI redisURI : partition.getAliases()) { + + if (matches(redisURI, host, port)) { + return partition; + } + } + } + + return null; + } + + private static boolean matches(RedisURI uri, String host, int port) { + return uri.getPort() == port && host.equals(uri.getHost()); + } + /** * Update the partition cache. Updates are necessary after the partition details have changed. */ diff --git a/src/main/java/io/lettuce/core/cluster/models/partitions/RedisClusterNode.java b/src/main/java/io/lettuce/core/cluster/models/partitions/RedisClusterNode.java index e6e05d331b..a3c719bf87 100644 --- a/src/main/java/io/lettuce/core/cluster/models/partitions/RedisClusterNode.java +++ b/src/main/java/io/lettuce/core/cluster/models/partitions/RedisClusterNode.java @@ -49,6 +49,7 @@ public class RedisClusterNode implements Serializable, RedisNodeDescription { private BitSet slots; private final Set flags = EnumSet.noneOf(NodeFlag.class); + private final List aliases = new ArrayList<>(); public RedisClusterNode() { } @@ -79,6 +80,7 @@ public RedisClusterNode(RedisClusterNode redisClusterNode) { this.pingSentTimestamp = redisClusterNode.pingSentTimestamp; this.pongReceivedTimestamp = redisClusterNode.pongReceivedTimestamp; this.configEpoch = redisClusterNode.configEpoch; + this.aliases.addAll(redisClusterNode.aliases); if (redisClusterNode.slots != null && !redisClusterNode.slots.isEmpty()) { this.slots = new BitSet(SlotHash.SLOT_COUNT); @@ -271,6 +273,21 @@ public boolean is(NodeFlag nodeFlag) { return getFlags().contains(nodeFlag); } + /** + * Add an alias to {@link RedisClusterNode}. + * + * @param alias must not be {@literal null}. + */ + public void addAlias(RedisURI alias) { + + LettuceAssert.notNull(alias, "Alias URI must not be null"); + this.aliases.add(alias); + } + + public List getAliases() { + return aliases; + } + /** * @param slot the slot hash * @return true if the slot is contained within the handled slots. @@ -324,6 +341,7 @@ public String toString() { sb.append(", pongReceivedTimestamp=").append(pongReceivedTimestamp); sb.append(", configEpoch=").append(configEpoch); sb.append(", flags=").append(flags); + sb.append(", aliases=").append(aliases); if (slots != null) { sb.append(", slot count=").append(slots.cardinality()); } diff --git a/src/main/java/io/lettuce/core/cluster/topology/ClusterTopologyRefresh.java b/src/main/java/io/lettuce/core/cluster/topology/ClusterTopologyRefresh.java index 3c64b88490..79fd7d8029 100644 --- a/src/main/java/io/lettuce/core/cluster/topology/ClusterTopologyRefresh.java +++ b/src/main/java/io/lettuce/core/cluster/topology/ClusterTopologyRefresh.java @@ -122,15 +122,22 @@ NodeTopologyViews getNodeSpecificViews(Requests requestedTopology, Requests requ Set nodes = requestedTopology.nodes(); List views = new ArrayList<>(); - for (RedisURI node : nodes) { + for (RedisURI nodeUri : nodes) { try { - NodeTopologyView nodeTopologyView = NodeTopologyView.from(node, requestedTopology, requestedClients); + NodeTopologyView nodeTopologyView = NodeTopologyView.from(nodeUri, requestedTopology, requestedClients); if (!nodeTopologyView.isAvailable()) { continue; } + RedisClusterNode node = nodeTopologyView.getOwnPartition(); + if (node.getUri() == null) { + node.setUri(nodeUri); + } else { + node.addAlias(nodeUri); + } + List nodeWithStats = nodeTopologyView.getPartitions() // .stream() // .filter(ClusterTopologyRefresh::validNode) // @@ -140,10 +147,6 @@ NodeTopologyViews getNodeSpecificViews(Requests requestedTopology, Requests requ .filter(partition -> partition.is(RedisClusterNode.NodeFlag.MYSELF)) // .forEach(partition -> { - if (partition.getUri() == null) { - partition.setUri(node); - } - // record latency for later partition ordering latencies.put(partition.getNodeId(), nodeTopologyView.getLatency()); clientCountByNodeId.put(partition.getNodeId(), nodeTopologyView.getConnectedClients()); @@ -158,7 +161,7 @@ NodeTopologyViews getNodeSpecificViews(Requests requestedTopology, Requests requ views.add(nodeTopologyView); } catch (ExecutionException e) { - logger.warn(String.format("Cannot retrieve partition view from %s, error: %s", node, e)); + logger.warn(String.format("Cannot retrieve partition view from %s, error: %s", nodeUri, e)); } } diff --git a/src/main/java/io/lettuce/core/cluster/topology/NodeTopologyView.java b/src/main/java/io/lettuce/core/cluster/topology/NodeTopologyView.java index 48a7d69ca3..b59096de74 100644 --- a/src/main/java/io/lettuce/core/cluster/topology/NodeTopologyView.java +++ b/src/main/java/io/lettuce/core/cluster/topology/NodeTopologyView.java @@ -57,12 +57,9 @@ class NodeTopologyView { this.partitions = ClusterPartitionParser.parse(clusterNodes); this.connectedClients = getClients(clientList); - this.clusterNodes = clusterNodes; this.clientList = clientList; this.latency = latency; - - getOwnPartition().setUri(redisURI); } static NodeTopologyView from(RedisURI redisURI, Requests clusterNodesRequests, Requests clientListRequests) @@ -111,10 +108,15 @@ String getNodeId() { } RedisURI getRedisURI() { + + if (partitions.isEmpty()) { + return redisURI; + } + return getOwnPartition().getUri(); } - private RedisClusterNode getOwnPartition() { + RedisClusterNode getOwnPartition() { for (RedisClusterNode partition : partitions) { if (partition.is(RedisClusterNode.NodeFlag.MYSELF)) { return partition; diff --git a/src/main/java/io/lettuce/core/cluster/topology/NodeTopologyViews.java b/src/main/java/io/lettuce/core/cluster/topology/NodeTopologyViews.java index 1d42675c46..c65004fa3d 100644 --- a/src/main/java/io/lettuce/core/cluster/topology/NodeTopologyViews.java +++ b/src/main/java/io/lettuce/core/cluster/topology/NodeTopologyViews.java @@ -26,7 +26,7 @@ */ class NodeTopologyViews { - private List views = new ArrayList<>(); + private List views; public NodeTopologyViews(List views) { this.views = views; diff --git a/src/test/java/io/lettuce/core/cluster/models/partitions/PartitionsTest.java b/src/test/java/io/lettuce/core/cluster/models/partitions/PartitionsTest.java index 8deeb3128d..af07a6b42a 100644 --- a/src/test/java/io/lettuce/core/cluster/models/partitions/PartitionsTest.java +++ b/src/test/java/io/lettuce/core/cluster/models/partitions/PartitionsTest.java @@ -36,7 +36,7 @@ public class PartitionsTest { Arrays.asList(4, 5, 6), new HashSet<>()); @Test - public void contains() throws Exception { + public void contains() { Partitions partitions = new Partitions(); partitions.add(node1); @@ -46,7 +46,7 @@ public void contains() throws Exception { } @Test - public void containsUsesReadView() throws Exception { + public void containsUsesReadView() { Partitions partitions = new Partitions(); partitions.getPartitions().add(node1); @@ -57,7 +57,7 @@ public void containsUsesReadView() throws Exception { } @Test - public void containsAll() throws Exception { + public void containsAll() { Partitions partitions = new Partitions(); partitions.add(node1); @@ -67,7 +67,7 @@ public void containsAll() throws Exception { } @Test - public void containsAllUsesReadView() throws Exception { + public void containsAllUsesReadView() { Partitions partitions = new Partitions(); partitions.getPartitions().add(node1); @@ -78,7 +78,7 @@ public void containsAllUsesReadView() throws Exception { } @Test - public void add() throws Exception { + public void add() { Partitions partitions = new Partitions(); partitions.add(node1); @@ -89,7 +89,7 @@ public void add() throws Exception { } @Test - public void addPartitionClearsCache() throws Exception { + public void addPartitionClearsCache() { Partitions partitions = new Partitions(); partitions.addPartition(node1); @@ -98,7 +98,7 @@ public void addPartitionClearsCache() throws Exception { } @Test - public void addAll() throws Exception { + public void addAll() { Partitions partitions = new Partitions(); partitions.addAll(Arrays.asList(node1, node2)); @@ -108,7 +108,7 @@ public void addAll() throws Exception { } @Test - public void getPartitionBySlot() throws Exception { + public void getPartitionBySlot() { Partitions partitions = new Partitions(); @@ -119,7 +119,19 @@ public void getPartitionBySlot() throws Exception { } @Test - public void remove() throws Exception { + public void getPartitionByAlias() { + + Partitions partitions = new Partitions(); + node1.addAlias(RedisURI.create("foobar", 1234)); + partitions.add(node1); + + assertThat(partitions.getPartition(node1.getUri().getHost(), node1.getUri().getPort())).isEqualTo(node1); + assertThat(partitions.getPartition("foobar", 1234)).isEqualTo(node1); + assertThat(partitions.getPartition("unknown", 1234)).isNull(); + } + + @Test + public void remove() { Partitions partitions = new Partitions(); partitions.addAll(Arrays.asList(node1, node2)); @@ -130,7 +142,7 @@ public void remove() throws Exception { } @Test - public void removeAll() throws Exception { + public void removeAll() { Partitions partitions = new Partitions(); partitions.addAll(Arrays.asList(node1, node2)); @@ -141,7 +153,7 @@ public void removeAll() throws Exception { } @Test - public void clear() throws Exception { + public void clear() { Partitions partitions = new Partitions(); partitions.addAll(Arrays.asList(node1, node2)); @@ -152,7 +164,7 @@ public void clear() throws Exception { } @Test - public void retainAll() throws Exception { + public void retainAll() { Partitions partitions = new Partitions(); partitions.addAll(Arrays.asList(node1, node2)); @@ -163,7 +175,7 @@ public void retainAll() throws Exception { } @Test - public void toArray() throws Exception { + public void toArray() { Partitions partitions = new Partitions(); partitions.addAll(Arrays.asList(node1, node2)); @@ -172,7 +184,7 @@ public void toArray() throws Exception { } @Test - public void toArrayUsesReadView() throws Exception { + public void toArrayUsesReadView() { Partitions partitions = new Partitions(); partitions.getPartitions().addAll(Arrays.asList(node1, node2)); @@ -183,7 +195,7 @@ public void toArrayUsesReadView() throws Exception { } @Test - public void toArray2() throws Exception { + public void toArray2() { Partitions partitions = new Partitions(); partitions.addAll(Arrays.asList(node1, node2)); @@ -192,7 +204,7 @@ public void toArray2() throws Exception { } @Test - public void toArray2UsesReadView() throws Exception { + public void toArray2UsesReadView() { Partitions partitions = new Partitions(); partitions.getPartitions().addAll(Arrays.asList(node1, node2)); @@ -205,7 +217,7 @@ public void toArray2UsesReadView() throws Exception { } @Test - public void getPartitionByNodeId() throws Exception { + public void getPartitionByNodeId() { Partitions partitions = new Partitions(); partitions.add(node1); @@ -216,7 +228,7 @@ public void getPartitionByNodeId() throws Exception { } @Test - public void reload() throws Exception { + public void reload() { RedisClusterNode other = new RedisClusterNode(RedisURI.create("localhost", 6666), "c", true, "", 0, 0, 0, Arrays.asList(1, 2, 3, 4, 5, 6), new HashSet<>()); @@ -231,7 +243,7 @@ public void reload() throws Exception { } @Test - public void reloadEmpty() throws Exception { + public void reloadEmpty() { Partitions partitions = new Partitions(); partitions.reload(Arrays.asList()); @@ -240,7 +252,7 @@ public void reloadEmpty() throws Exception { } @Test - public void isEmpty() throws Exception { + public void isEmpty() { Partitions partitions = new Partitions(); partitions.add(node1); @@ -249,7 +261,7 @@ public void isEmpty() throws Exception { } @Test - public void size() throws Exception { + public void size() { Partitions partitions = new Partitions(); partitions.add(node1); @@ -258,7 +270,7 @@ public void size() throws Exception { } @Test - public void sizeUsesReadView() throws Exception { + public void sizeUsesReadView() { Partitions partitions = new Partitions(); partitions.getPartitions().add(node1); @@ -271,7 +283,7 @@ public void sizeUsesReadView() throws Exception { } @Test - public void getPartition() throws Exception { + public void getPartition() { Partitions partitions = new Partitions(); partitions.add(node1); @@ -280,7 +292,7 @@ public void getPartition() throws Exception { } @Test - public void iterator() throws Exception { + public void iterator() { Partitions partitions = new Partitions(); partitions.add(node1); @@ -289,7 +301,7 @@ public void iterator() throws Exception { } @Test - public void iteratorUsesReadView() throws Exception { + public void iteratorUsesReadView() { Partitions partitions = new Partitions(); partitions.getPartitions().add(node1); @@ -301,7 +313,7 @@ public void iteratorUsesReadView() throws Exception { } @Test - public void iteratorIsSafeDuringUpdate() throws Exception { + public void iteratorIsSafeDuringUpdate() { Partitions partitions = new Partitions(); partitions.add(node1); @@ -325,7 +337,7 @@ public void iteratorIsSafeDuringUpdate() throws Exception { } @Test - public void testToString() throws Exception { + public void testToString() { Partitions partitions = new Partitions(); partitions.add(node1); diff --git a/src/test/java/io/lettuce/core/cluster/models/partitions/RedisClusterNodeTest.java b/src/test/java/io/lettuce/core/cluster/models/partitions/RedisClusterNodeTest.java index 05f121b589..19f0c8bcdb 100644 --- a/src/test/java/io/lettuce/core/cluster/models/partitions/RedisClusterNodeTest.java +++ b/src/test/java/io/lettuce/core/cluster/models/partitions/RedisClusterNodeTest.java @@ -34,12 +34,14 @@ public void shouldCopyNode() { RedisClusterNode node = new RedisClusterNode(); node.setSlots(Arrays.asList(1, 2, 3, SlotHash.SLOT_COUNT - 1)); + node.addAlias(RedisURI.create("foo", 6379)); RedisClusterNode copy = new RedisClusterNode(node); assertThat(copy.getSlots()).containsExactly(1, 2, 3, SlotHash.SLOT_COUNT - 1); assertThat(copy.hasSlot(1)).isTrue(); assertThat(copy.hasSlot(SlotHash.SLOT_COUNT - 1)).isTrue(); + assertThat(copy.getAliases()).contains(RedisURI.create("foo", 6379)); } @Test diff --git a/src/test/java/io/lettuce/core/cluster/topology/NodeTopologyViewsTest.java b/src/test/java/io/lettuce/core/cluster/topology/NodeTopologyViewsTest.java index c4fa57bac4..bf7b8bcf48 100644 --- a/src/test/java/io/lettuce/core/cluster/topology/NodeTopologyViewsTest.java +++ b/src/test/java/io/lettuce/core/cluster/topology/NodeTopologyViewsTest.java @@ -30,9 +30,9 @@ public class NodeTopologyViewsTest { @Test - public void shouldReuseKnownUris() throws Exception { + public void shouldReuseKnownUris() { - RedisURI localhost = RedisURI.create("localhost", 6479); + RedisURI localhost = RedisURI.create("127.0.0.1", 6479); RedisURI otherhost = RedisURI.create("127.0.0.2", 7000); RedisURI host3 = RedisURI.create("127.0.0.3", 7000); @@ -41,7 +41,7 @@ public void shouldReuseKnownUris() throws Exception { + "2 127.0.0.2:7000 master - 111 1401258245007 222 connected 7000 12000 12002-16383\n" + "3 127.0.0.3:7000 master - 111 1401258245007 222 connected 7000 12000 12002-16383\n"; - String viewByOtherhost = "1 127.0.0.1:6479 master - 0 1401258245007 2 connected 8000-11999\n" + String viewByOtherhost = "1 127.0.0.2:6479 master - 0 1401258245007 2 connected 8000-11999\n" + "2 127.0.0.2:7000 master,myself - 111 1401258245007 222 connected 7000 12000 12002-16383\n" + "3 127.0.0.3:7000 master - 111 1401258245007 222 connected 7000 12000 12002-16383\n"; @@ -55,12 +55,12 @@ public void shouldReuseKnownUris() throws Exception { } @Test(expected = IllegalStateException.class) - public void shouldFailWithoutOwnPartition() throws Exception { + public void shouldFailWithoutOwnPartition() { - RedisURI localhost = RedisURI.create("localhost", 6479); + RedisURI localhost = RedisURI.create("127.0.0.1", 6479); String viewByLocalhost = "1 127.0.0.1:6479 master - 0 1401258245007 2 connected 8000-11999\n"; - new NodeTopologyView(localhost, viewByLocalhost, "", 0); + new NodeTopologyView(localhost, viewByLocalhost, "", 0).getOwnPartition(); } }