diff --git a/src/main/java/io/lettuce/core/cluster/models/partitions/ClusterPartitionParser.java b/src/main/java/io/lettuce/core/cluster/models/partitions/ClusterPartitionParser.java index f174e35f60..40fb501b57 100644 --- a/src/main/java/io/lettuce/core/cluster/models/partitions/ClusterPartitionParser.java +++ b/src/main/java/io/lettuce/core/cluster/models/partitions/ClusterPartitionParser.java @@ -16,12 +16,11 @@ package io.lettuce.core.cluster.models.partitions; import java.util.*; -import java.util.regex.Pattern; -import java.util.stream.Collectors; import io.lettuce.core.LettuceStrings; import io.lettuce.core.RedisException; import io.lettuce.core.RedisURI; +import io.lettuce.core.cluster.SlotHash; import io.lettuce.core.internal.HostAndPort; import io.lettuce.core.internal.LettuceLists; @@ -37,9 +36,6 @@ public class ClusterPartitionParser { private static final String TOKEN_SLOT_IN_TRANSITION = "["; private static final char TOKEN_NODE_SEPARATOR = '\n'; - private static final Pattern TOKEN_PATTERN = Pattern.compile(Character.toString(TOKEN_NODE_SEPARATOR)); - private static final Pattern SPACE_PATTERN = Pattern.compile(" "); - private static final Pattern DASH_PATTERN = Pattern.compile("\\-"); private static final Map FLAG_MAPPING; static { @@ -74,8 +70,18 @@ public static Partitions parse(String nodes) { Partitions result = new Partitions(); try { - List mappedNodes = TOKEN_PATTERN.splitAsStream(nodes).filter(s -> !s.isEmpty()) - .map(ClusterPartitionParser::parseNode).collect(Collectors.toList()); + + String[] lines = nodes.split(Character.toString(TOKEN_NODE_SEPARATOR)); + List mappedNodes = new ArrayList<>(lines.length); + + for (String line : lines) { + + if (line.isEmpty()) { + continue; + + } + mappedNodes.add(ClusterPartitionParser.parseNode(line)); + } result.addAll(mappedNodes); } catch (Exception e) { throw new RedisException("Cannot parse " + nodes, e); @@ -86,7 +92,7 @@ public static Partitions parse(String nodes) { private static RedisClusterNode parseNode(String nodeInformation) { - Iterator iterator = SPACE_PATTERN.splitAsStream(nodeInformation).iterator(); + Iterator iterator = Arrays.asList(nodeInformation.split(" ")).iterator(); String nodeId = iterator.next(); boolean connected = false; @@ -122,7 +128,7 @@ private static RedisClusterNode parseNode(String nodeInformation) { } List slotStrings = LettuceLists.newList(iterator); // slot, from-to [slot->-nodeID] [slot-<-nodeID] - List slots = readSlots(slotStrings); + BitSet slots = readSlots(slotStrings); RedisClusterNode partition = new RedisClusterNode(uri, nodeId, connected, replicaOf, pingSentTs, pongReceivedTs, configEpoch, slots, nodeFlags); @@ -147,9 +153,9 @@ private static Set readFlags(List flagStrings return Collections.unmodifiableSet(flags); } - private static List readSlots(List slotStrings) { + private static BitSet readSlots(List slotStrings) { - List slots = new ArrayList<>(); + BitSet slots = new BitSet(SlotHash.SLOT_COUNT); for (String slotString : slotStrings) { if (slotString.startsWith(TOKEN_SLOT_IN_TRANSITION)) { @@ -160,21 +166,21 @@ private static List readSlots(List slotStrings) { if (slotString.contains("-")) { // slot range - Iterator it = DASH_PATTERN.splitAsStream(slotString).iterator(); + Iterator it = Arrays.asList(slotString.split("\\-")).iterator(); int from = Integer.parseInt(it.next()); int to = Integer.parseInt(it.next()); for (int slot = from; slot <= to; slot++) { - slots.add(slot); + slots.set(slot); } continue; } - slots.add(Integer.parseInt(slotString)); + slots.set(Integer.parseInt(slotString)); } - return Collections.unmodifiableList(slots); + return slots; } private static long getLongFromIterator(Iterator iterator, long defaultValue) { diff --git a/src/main/java/io/lettuce/core/cluster/models/partitions/Partitions.java b/src/main/java/io/lettuce/core/cluster/models/partitions/Partitions.java index 03842de8f9..ba69e73bae 100644 --- a/src/main/java/io/lettuce/core/cluster/models/partitions/Partitions.java +++ b/src/main/java/io/lettuce/core/cluster/models/partitions/Partitions.java @@ -158,9 +158,7 @@ public void updateCache() { for (RedisClusterNode partition : partitions) { readView.add(partition); - for (Integer integer : partition.getSlots()) { - slotCache[integer.intValue()] = partition; - } + partition.forEachSlot(i -> slotCache[i] = partition); } this.slotCache = slotCache; diff --git a/src/main/java/io/lettuce/core/cluster/models/partitions/RedisClusterNode.java b/src/main/java/io/lettuce/core/cluster/models/partitions/RedisClusterNode.java index e4615c61af..b12aa8d972 100644 --- a/src/main/java/io/lettuce/core/cluster/models/partitions/RedisClusterNode.java +++ b/src/main/java/io/lettuce/core/cluster/models/partitions/RedisClusterNode.java @@ -17,6 +17,7 @@ import java.io.Serializable; import java.util.*; +import java.util.function.IntConsumer; import io.lettuce.core.RedisURI; import io.lettuce.core.cluster.SlotHash; @@ -70,6 +71,23 @@ public RedisClusterNode(RedisURI uri, String nodeId, boolean connected, String s setFlags(flags); } + RedisClusterNode(RedisURI uri, String nodeId, boolean connected, String slaveOf, long pingSentTimestamp, + long pongReceivedTimestamp, long configEpoch, BitSet slots, Set flags) { + + this.uri = uri; + this.nodeId = nodeId; + this.connected = connected; + this.slaveOf = slaveOf; + this.pingSentTimestamp = pingSentTimestamp; + this.pongReceivedTimestamp = pongReceivedTimestamp; + this.configEpoch = configEpoch; + + this.slots = new BitSet(slots.length()); + this.slots.or(slots); + + setFlags(flags); + } + public RedisClusterNode(RedisClusterNode redisClusterNode) { LettuceAssert.notNull(redisClusterNode, "RedisClusterNode must not be null"); @@ -225,6 +243,28 @@ public List getSlots() { return slots; } + /** + * Performs the given action for each slot of this {@link RedisClusterNode} until all elements have been processed or the + * action throws an exception. Unless otherwise specified by the implementing class, actions are performed in the order of + * iteration (if an iteration order is specified). Exceptions thrown by the action are relayed to the caller. + * + * @param consumer + * @since 5.2 + */ + public void forEachSlot(IntConsumer consumer) { + + if (slots == null || slots.isEmpty()) { + return; + } + + for (int i = 0; i < this.slots.length(); i++) { + + if (this.slots.get(i)) { + consumer.accept(i); + } + } + } + /** * Sets the list of slots for which this {@link RedisClusterNode} is the * {@link io.lettuce.core.cluster.models.partitions.RedisClusterNode.NodeFlag#MASTER}. The list is empty if this node is not diff --git a/src/main/java/io/lettuce/core/cluster/topology/ClusterTopologyRefresh.java b/src/main/java/io/lettuce/core/cluster/topology/ClusterTopologyRefresh.java index 0cf21c8825..95fb57e366 100644 --- a/src/main/java/io/lettuce/core/cluster/topology/ClusterTopologyRefresh.java +++ b/src/main/java/io/lettuce/core/cluster/topology/ClusterTopologyRefresh.java @@ -184,19 +184,22 @@ NodeTopologyViews getNodeSpecificViews(Requests requestedTopology, Requests requ node.addAlias(nodeUri); } - List nodeWithStats = nodeTopologyView.getPartitions() // - .stream() // - .filter(ClusterTopologyRefresh::validNode) // - .map(RedisClusterNodeSnapshot::new).collect(Collectors.toList()); + List nodeWithStats = new ArrayList<>(nodeTopologyView.getPartitions().size()); - nodeWithStats.stream() // - .filter(partition -> partition.is(RedisClusterNode.NodeFlag.MYSELF)) // - .forEach(partition -> { + for (RedisClusterNode partition : nodeTopologyView.getPartitions()) { + + if (validNode(partition)) { + RedisClusterNodeSnapshot redisClusterNodeSnapshot = new RedisClusterNodeSnapshot(partition); + nodeWithStats.add(redisClusterNodeSnapshot); + + if (partition.is(RedisClusterNode.NodeFlag.MYSELF)) { // record latency for later partition ordering - latencies.put(partition.getNodeId(), nodeTopologyView.getLatency()); - clientCountByNodeId.put(partition.getNodeId(), nodeTopologyView.getConnectedClients()); - }); + latencies.put(partition.getNodeId(), nodeTopologyView.getLatency()); + clientCountByNodeId.put(partition.getNodeId(), nodeTopologyView.getConnectedClients()); + } + } + } allNodes.addAll(nodeWithStats); @@ -318,8 +321,22 @@ public RedisURI getViewedBy(Map map, Partitions partitions private static Set difference(Set set1, Set set2) { - Set result = set1.stream().filter(e -> !set2.contains(e)).collect(Collectors.toSet()); - result.addAll(set2.stream().filter(e -> !set1.contains(e)).collect(Collectors.toList())); + Set result = new HashSet<>(set1.size()); + + for (E e1 : set1) { + if (!set2.contains(e1)) { + result.add(e1); + } + } + + List list = new ArrayList<>(set2.size()); + for (E e : set2) { + if (!set1.contains(e)) { + list.add(e); + } + } + + result.addAll(list); return result; }