Skip to content

Commit

Permalink
Use slot details of CLUSTER SLOTS in ClusterSlotsParser #183
Browse files Browse the repository at this point in the history
  • Loading branch information
mp911de committed Jan 29, 2016
1 parent b0dc59a commit 411fec3
Show file tree
Hide file tree
Showing 2 changed files with 70 additions and 18 deletions.
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
package com.lambdaworks.redis.cluster.models.slots;

import java.util.Collections;
import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
import java.util.*;

import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.google.common.primitives.Ints;
import com.lambdaworks.redis.RedisURI;
import com.lambdaworks.redis.cluster.models.partitions.RedisClusterNode;
Expand Down Expand Up @@ -34,6 +33,7 @@ private ClusterSlotsParser() {
*/
public static List<ClusterSlotRange> parse(List<?> clusterSlotsOutput) {
List<ClusterSlotRange> result = Lists.newArrayList();
Map<String, RedisClusterNode> nodeCache = Maps.newHashMap();

for (Object o : clusterSlotsOutput) {

Expand All @@ -46,7 +46,7 @@ public static List<ClusterSlotRange> parse(List<?> clusterSlotsOutput) {
continue;
}

ClusterSlotRange clusterSlotRange = parseRange(range);
ClusterSlotRange clusterSlotRange = parseRange(range, nodeCache);
result.add(clusterSlotRange);
}

Expand All @@ -60,7 +60,7 @@ public int compare(ClusterSlotRange o1, ClusterSlotRange o2) {
return Collections.unmodifiableList(result);
}

private static ClusterSlotRange parseRange(List<?> range) {
private static ClusterSlotRange parseRange(List<?> range, Map<String, RedisClusterNode> nodeCache) {
Iterator<?> iterator = range.iterator();

int from = Ints.checkedCast(getLongFromIterator(iterator, 0));
Expand All @@ -69,14 +69,17 @@ private static ClusterSlotRange parseRange(List<?> range) {

List<RedisClusterNode> slaves = Lists.newArrayList();
if (iterator.hasNext()) {
master = getRedisClusterNode(iterator);
master = getRedisClusterNode(iterator, nodeCache);
if(master != null) {
master.setFlags(Collections.singleton(RedisClusterNode.NodeFlag.MASTER));
Set<Integer> slots = Sets.newTreeSet(master.getSlots());
slots.addAll(createSlots(from, to));
master.setSlots(Lists.newArrayList(slots));
}
}

while (iterator.hasNext()) {
RedisClusterNode slave = getRedisClusterNode(iterator);
RedisClusterNode slave = getRedisClusterNode(iterator, nodeCache);
if (slave != null) {
slave.setSlaveOf(master.getNodeId());
slave.setFlags(Collections.singleton(RedisClusterNode.NodeFlag.SLAVE));
Expand All @@ -87,8 +90,17 @@ private static ClusterSlotRange parseRange(List<?> range) {
return new ClusterSlotRange(from, to, master, Collections.unmodifiableList(slaves));
}

private static RedisClusterNode getRedisClusterNode(Iterator<?> iterator) {
private static List<Integer> createSlots(int from, int to) {
List<Integer> slots = Lists.newArrayList();
for (int i = from; i < to + 1; i++) {
slots.add(i);
}
return slots;
}

private static RedisClusterNode getRedisClusterNode(Iterator<?> iterator, Map<String, RedisClusterNode> nodeCache) {
Object element = iterator.next();
RedisClusterNode redisClusterNode = null;
if (element instanceof List) {
List<?> hostAndPortList = (List<?>) element;
if (hostAndPortList.size() < 2) {
Expand All @@ -100,18 +112,33 @@ private static RedisClusterNode getRedisClusterNode(Iterator<?> iterator) {
int port = Ints.checkedCast(getLongFromIterator(hostAndPortIterator, 0));
String nodeId;

RedisClusterNode redisClusterNode = new RedisClusterNode();
redisClusterNode.setUri(new RedisURI.Builder().redis(host, port).build());

if (hostAndPortIterator.hasNext()) {
nodeId = (String) hostAndPortIterator.next();
redisClusterNode.setNodeId(nodeId);
}

return redisClusterNode;

redisClusterNode = nodeCache.get(nodeId);
if(redisClusterNode == null) {
redisClusterNode = createNode(host, port);
nodeCache.put(nodeId, redisClusterNode);
redisClusterNode.setNodeId(nodeId);
}
}
else {
String key = host + ":" + port;
redisClusterNode = nodeCache.get(key);
if(redisClusterNode == null) {
redisClusterNode = createNode(host, port);
nodeCache.put(key, redisClusterNode);
}
}
}
return null;
return redisClusterNode;
}

private static RedisClusterNode createNode(String host, int port) {
RedisClusterNode redisClusterNode = new RedisClusterNode();
redisClusterNode.setUri(new RedisURI.Builder().redis(host, port).build());
redisClusterNode.setSlots(new ArrayList<Integer>());
return redisClusterNode;
}

private static long getLongFromIterator(Iterator<?> iterator, long defaultValue) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ public void testParse() throws Exception {

@Test
public void testParseWithSlave() throws Exception {
List<?> list = ImmutableList.of(Lists.newArrayList("0", "1", Lists.newArrayList("1", "2", "nodeId1"), Lists.newArrayList("1", 2, "nodeId2")));
List<?> list = ImmutableList.of(Lists.newArrayList("100", "200", Lists.newArrayList("1", "2", "nodeId1"), Lists.newArrayList("1", 2, "nodeId2")));
List<ClusterSlotRange> result = ClusterSlotsParser.parse(list);
assertThat(result).hasSize(1);
ClusterSlotRange clusterSlotRange = result.get(0);
Expand All @@ -60,6 +60,10 @@ public void testParseWithSlave() throws Exception {
assertThat(masterNode.getUri().getHost()).isEqualTo("1");
assertThat(masterNode.getUri().getPort()).isEqualTo(2);
assertThat(masterNode.getFlags()).contains(RedisClusterNode.NodeFlag.MASTER);
assertThat(masterNode.getSlots()).contains(100, 101, 199, 200);
assertThat(masterNode.getSlots()).doesNotContain(99, 201);
assertThat(masterNode.getSlots()).hasSize(101);


assertThat(clusterSlotRange.getSlaves()).hasSize(1);
assertThat(clusterSlotRange.getSlaveNodes()).hasSize(1);
Expand All @@ -75,6 +79,27 @@ public void testParseWithSlave() throws Exception {
assertThat(slaveNode.getFlags()).contains(RedisClusterNode.NodeFlag.SLAVE);
}

@Test
public void testSameNode() throws Exception {
List<?> list = ImmutableList.of(Lists.newArrayList("100", "200", Lists.newArrayList("1", "2", "nodeId1"), Lists.newArrayList("1", 2, "nodeId2")),
Lists.newArrayList("200", "300", Lists.newArrayList("1", "2", "nodeId1"), Lists.newArrayList("1", 2, "nodeId2")));

List<ClusterSlotRange> result = ClusterSlotsParser.parse(list);
assertThat(result).hasSize(2);

assertThat(result.get(0).getMasterNode()).isSameAs(result.get(1).getMasterNode());

RedisClusterNode masterNode = result.get(0).getMasterNode();
assertThat(masterNode).isNotNull();
assertThat(masterNode.getNodeId()).isEqualTo("nodeId1");
assertThat(masterNode.getUri().getHost()).isEqualTo("1");
assertThat(masterNode.getUri().getPort()).isEqualTo(2);
assertThat(masterNode.getFlags()).contains(RedisClusterNode.NodeFlag.MASTER);
assertThat(masterNode.getSlots()).contains(100, 101, 199, 200, 203);
assertThat(masterNode.getSlots()).doesNotContain(99, 301);
assertThat(masterNode.getSlots()).hasSize(201);
}

@Test
public void testHostAndPortConstructor() throws Exception {

Expand Down

0 comments on commit 411fec3

Please sign in to comment.