Skip to content

Commit

Permalink
Add alias to RedisClusterNode #712
Browse files Browse the repository at this point in the history
Lettuce now provides a cluster node alias to better cope with load balancers or alternate cluster names. We are using the Redis reported hostname as URI (was previously the seed node URI) and add the seed node URI as alias. This way we make sure to connect the appropriate host directly without connecting the balancer and receiving redirects.

Redirects can happen in an environment where the load balancer forwards connections to random cluster master nodes.
  • Loading branch information
mp911de committed Mar 19, 2018
1 parent 62adb56 commit d3b7070
Show file tree
Hide file tree
Showing 10 changed files with 124 additions and 64 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ public void punsubscribed(K pattern, long count) {
}

private RedisClusterNode getNode() {
return nodeId != null ? getPartitions().getPartitionByNodeId(nodeId) : getPartition(host, port);
return nodeId != null ? getPartitions().getPartitionByNodeId(nodeId) : getPartitions().getPartition(host, port);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -379,7 +379,7 @@ private void beforeGetConnection(Intent intent, String host, int port) {
}

if (validateClusterNodeMembership()) {
RedisClusterNode redisClusterNode = getPartition(host, port);
RedisClusterNode redisClusterNode = partitions.getPartition(host, port);

if (redisClusterNode == null) {
HostAndPort hostAndPort = HostAndPort.of(host, port);
Expand All @@ -388,17 +388,6 @@ private void beforeGetConnection(Intent intent, String host, int port) {
}
}

protected RedisClusterNode getPartition(String host, int port) {

for (RedisClusterNode partition : partitions) {
RedisURI uri = partition.getUri();
if (port == uri.getPort() && host.equals(uri.getHost())) {
return partition;
}
}
return null;
}

@Override
public void close() {
closeAsync().join();
Expand Down Expand Up @@ -476,7 +465,7 @@ private boolean isStale(ConnectionKey connectionKey) {
return false;
}

if (connectionKey.host != null && getPartition(connectionKey.host, connectionKey.port) != null) {
if (connectionKey.host != null && partitions.getPartition(connectionKey.host, connectionKey.port) != null) {
return false;
}

Expand Down Expand Up @@ -581,7 +570,7 @@ public ConnectionFuture<StatefulRedisConnection<K, V>> apply(ConnectionKey key)
if (key.host != null) {

if (validateClusterNodeMembership()) {
if (getPartition(key.host, key.port) == null) {
if (partitions.getPartition(key.host, key.port) == null) {
throw invalidConnectionPoint(key.host + ":" + key.port);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import java.util.*;

import io.lettuce.core.RedisURI;
import io.lettuce.core.cluster.SlotHash;
import io.lettuce.core.internal.LettuceAssert;

Expand Down Expand Up @@ -61,8 +62,8 @@ public class Partitions implements Collection<RedisClusterNode> {
/**
* Retrieve a {@link RedisClusterNode} by its slot number. This method does not distinguish between masters and slaves.
*
* @param slot the slot
* @return RedisClusterNode or {@literal null}
* @param slot the slot hash.
* @return the {@link RedisClusterNode} or {@literal null} if not found.
*/
public RedisClusterNode getPartitionBySlot(int slot) {
return slotCache[slot];
Expand All @@ -71,8 +72,8 @@ public RedisClusterNode getPartitionBySlot(int slot) {
/**
* Retrieve a {@link RedisClusterNode} by its node id.
*
* @param nodeId the nodeId
* @return RedisClusterNode or {@literal null}
* @param nodeId the nodeId.
* @return the {@link RedisClusterNode} or {@literal null} if not found.
*/
public RedisClusterNode getPartitionByNodeId(String nodeId) {

Expand All @@ -81,9 +82,42 @@ public RedisClusterNode getPartitionByNodeId(String nodeId) {
return partition;
}
}

return null;
}

/**
* Retrieve a {@link RedisClusterNode} by its hostname/port considering node aliases.
*
* @param host hostname.
* @param port port number.
* @return the {@link RedisClusterNode} or {@literal null} if not found.
*/
public RedisClusterNode getPartition(String host, int port) {

for (RedisClusterNode partition : nodeReadView) {

RedisURI uri = partition.getUri();

if (matches(uri, host, port)) {
return partition;
}

for (RedisURI redisURI : partition.getAliases()) {

if (matches(redisURI, host, port)) {
return partition;
}
}
}

return null;
}

private static boolean matches(RedisURI uri, String host, int port) {
return uri.getPort() == port && host.equals(uri.getHost());
}

/**
* Update the partition cache. Updates are necessary after the partition details have changed.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ public class RedisClusterNode implements Serializable, RedisNodeDescription {

private BitSet slots;
private final Set<NodeFlag> flags = EnumSet.noneOf(NodeFlag.class);
private final List<RedisURI> aliases = new ArrayList<>();

public RedisClusterNode() {
}
Expand Down Expand Up @@ -79,6 +80,7 @@ public RedisClusterNode(RedisClusterNode redisClusterNode) {
this.pingSentTimestamp = redisClusterNode.pingSentTimestamp;
this.pongReceivedTimestamp = redisClusterNode.pongReceivedTimestamp;
this.configEpoch = redisClusterNode.configEpoch;
this.aliases.addAll(redisClusterNode.aliases);

if (redisClusterNode.slots != null && !redisClusterNode.slots.isEmpty()) {
this.slots = new BitSet(SlotHash.SLOT_COUNT);
Expand Down Expand Up @@ -271,6 +273,21 @@ public boolean is(NodeFlag nodeFlag) {
return getFlags().contains(nodeFlag);
}

/**
* Add an alias to {@link RedisClusterNode}.
*
* @param alias must not be {@literal null}.
*/
public void addAlias(RedisURI alias) {

LettuceAssert.notNull(alias, "Alias URI must not be null");
this.aliases.add(alias);
}

public List<RedisURI> getAliases() {
return aliases;
}

/**
* @param slot the slot hash
* @return true if the slot is contained within the handled slots.
Expand Down Expand Up @@ -324,6 +341,7 @@ public String toString() {
sb.append(", pongReceivedTimestamp=").append(pongReceivedTimestamp);
sb.append(", configEpoch=").append(configEpoch);
sb.append(", flags=").append(flags);
sb.append(", aliases=").append(aliases);
if (slots != null) {
sb.append(", slot count=").append(slots.cardinality());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,15 +122,22 @@ NodeTopologyViews getNodeSpecificViews(Requests requestedTopology, Requests requ
Set<RedisURI> nodes = requestedTopology.nodes();

List<NodeTopologyView> views = new ArrayList<>();
for (RedisURI node : nodes) {
for (RedisURI nodeUri : nodes) {

try {
NodeTopologyView nodeTopologyView = NodeTopologyView.from(node, requestedTopology, requestedClients);
NodeTopologyView nodeTopologyView = NodeTopologyView.from(nodeUri, requestedTopology, requestedClients);

if (!nodeTopologyView.isAvailable()) {
continue;
}

RedisClusterNode node = nodeTopologyView.getOwnPartition();
if (node.getUri() == null) {
node.setUri(nodeUri);
} else {
node.addAlias(nodeUri);
}

List<RedisClusterNodeSnapshot> nodeWithStats = nodeTopologyView.getPartitions() //
.stream() //
.filter(ClusterTopologyRefresh::validNode) //
Expand All @@ -140,10 +147,6 @@ NodeTopologyViews getNodeSpecificViews(Requests requestedTopology, Requests requ
.filter(partition -> partition.is(RedisClusterNode.NodeFlag.MYSELF)) //
.forEach(partition -> {

if (partition.getUri() == null) {
partition.setUri(node);
}

// record latency for later partition ordering
latencies.put(partition.getNodeId(), nodeTopologyView.getLatency());
clientCountByNodeId.put(partition.getNodeId(), nodeTopologyView.getConnectedClients());
Expand All @@ -158,7 +161,7 @@ NodeTopologyViews getNodeSpecificViews(Requests requestedTopology, Requests requ

views.add(nodeTopologyView);
} catch (ExecutionException e) {
logger.warn(String.format("Cannot retrieve partition view from %s, error: %s", node, e));
logger.warn(String.format("Cannot retrieve partition view from %s, error: %s", nodeUri, e));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,12 +57,9 @@ class NodeTopologyView {

this.partitions = ClusterPartitionParser.parse(clusterNodes);
this.connectedClients = getClients(clientList);

this.clusterNodes = clusterNodes;
this.clientList = clientList;
this.latency = latency;

getOwnPartition().setUri(redisURI);
}

static NodeTopologyView from(RedisURI redisURI, Requests clusterNodesRequests, Requests clientListRequests)
Expand Down Expand Up @@ -111,10 +108,15 @@ String getNodeId() {
}

RedisURI getRedisURI() {

if (partitions.isEmpty()) {
return redisURI;
}

return getOwnPartition().getUri();
}

private RedisClusterNode getOwnPartition() {
RedisClusterNode getOwnPartition() {
for (RedisClusterNode partition : partitions) {
if (partition.is(RedisClusterNode.NodeFlag.MYSELF)) {
return partition;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
*/
class NodeTopologyViews {

private List<NodeTopologyView> views = new ArrayList<>();
private List<NodeTopologyView> views;

public NodeTopologyViews(List<NodeTopologyView> views) {
this.views = views;
Expand Down
Loading

0 comments on commit d3b7070

Please sign in to comment.