Skip to content

Commit

Permalink
Reduce object allocations in ClusterTopologyParsing #1069
Browse files Browse the repository at this point in the history
Cluster topology refresh now has reduced object allocation overhead by using a bit set for initial parsing of slots.

Topology object iteration also uses loops again instead of Java 8 Stream API.
  • Loading branch information
mp911de committed Jun 26, 2019
1 parent 6b0752b commit 135b330
Show file tree
Hide file tree
Showing 4 changed files with 91 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,11 @@
package io.lettuce.core.cluster.models.partitions;

import java.util.*;
import java.util.regex.Pattern;
import java.util.stream.Collectors;

import io.lettuce.core.LettuceStrings;
import io.lettuce.core.RedisException;
import io.lettuce.core.RedisURI;
import io.lettuce.core.cluster.SlotHash;
import io.lettuce.core.internal.HostAndPort;
import io.lettuce.core.internal.LettuceLists;

Expand All @@ -37,9 +36,6 @@ public class ClusterPartitionParser {

private static final String TOKEN_SLOT_IN_TRANSITION = "[";
private static final char TOKEN_NODE_SEPARATOR = '\n';
private static final Pattern TOKEN_PATTERN = Pattern.compile(Character.toString(TOKEN_NODE_SEPARATOR));
private static final Pattern SPACE_PATTERN = Pattern.compile(" ");
private static final Pattern DASH_PATTERN = Pattern.compile("\\-");
private static final Map<String, RedisClusterNode.NodeFlag> FLAG_MAPPING;

static {
Expand Down Expand Up @@ -74,8 +70,18 @@ public static Partitions parse(String nodes) {
Partitions result = new Partitions();

try {
List<RedisClusterNode> mappedNodes = TOKEN_PATTERN.splitAsStream(nodes).filter(s -> !s.isEmpty())
.map(ClusterPartitionParser::parseNode).collect(Collectors.toList());

String[] lines = nodes.split(Character.toString(TOKEN_NODE_SEPARATOR));
List<RedisClusterNode> mappedNodes = new ArrayList<>(lines.length);

for (String line : lines) {

if (line.isEmpty()) {
continue;

}
mappedNodes.add(ClusterPartitionParser.parseNode(line));
}
result.addAll(mappedNodes);
} catch (Exception e) {
throw new RedisException("Cannot parse " + nodes, e);
Expand All @@ -86,7 +92,7 @@ public static Partitions parse(String nodes) {

private static RedisClusterNode parseNode(String nodeInformation) {

Iterator<String> iterator = SPACE_PATTERN.splitAsStream(nodeInformation).iterator();
Iterator<String> iterator = Arrays.asList(nodeInformation.split(" ")).iterator();

String nodeId = iterator.next();
boolean connected = false;
Expand Down Expand Up @@ -122,7 +128,7 @@ private static RedisClusterNode parseNode(String nodeInformation) {
}

List<String> slotStrings = LettuceLists.newList(iterator); // slot, from-to [slot->-nodeID] [slot-<-nodeID]
List<Integer> slots = readSlots(slotStrings);
BitSet slots = readSlots(slotStrings);

RedisClusterNode partition = new RedisClusterNode(uri, nodeId, connected, replicaOf, pingSentTs, pongReceivedTs,
configEpoch, slots, nodeFlags);
Expand All @@ -147,9 +153,9 @@ private static Set<RedisClusterNode.NodeFlag> readFlags(List<String> flagStrings
return Collections.unmodifiableSet(flags);
}

private static List<Integer> readSlots(List<String> slotStrings) {
private static BitSet readSlots(List<String> slotStrings) {

List<Integer> slots = new ArrayList<>();
BitSet slots = new BitSet(SlotHash.SLOT_COUNT);
for (String slotString : slotStrings) {

if (slotString.startsWith(TOKEN_SLOT_IN_TRANSITION)) {
Expand All @@ -160,21 +166,21 @@ private static List<Integer> readSlots(List<String> slotStrings) {

if (slotString.contains("-")) {
// slot range
Iterator<String> it = DASH_PATTERN.splitAsStream(slotString).iterator();
Iterator<String> it = Arrays.asList(slotString.split("\\-")).iterator();
int from = Integer.parseInt(it.next());
int to = Integer.parseInt(it.next());

for (int slot = from; slot <= to; slot++) {
slots.add(slot);
slots.set(slot);

}
continue;
}

slots.add(Integer.parseInt(slotString));
slots.set(Integer.parseInt(slotString));
}

return Collections.unmodifiableList(slots);
return slots;
}

private static long getLongFromIterator(Iterator<?> iterator, long defaultValue) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,9 +158,7 @@ public void updateCache() {
for (RedisClusterNode partition : partitions) {

readView.add(partition);
for (Integer integer : partition.getSlots()) {
slotCache[integer.intValue()] = partition;
}
partition.forEachSlot(i -> slotCache[i] = partition);
}

this.slotCache = slotCache;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import java.io.Serializable;
import java.util.*;
import java.util.function.IntConsumer;

import io.lettuce.core.RedisURI;
import io.lettuce.core.cluster.SlotHash;
Expand Down Expand Up @@ -70,6 +71,23 @@ public RedisClusterNode(RedisURI uri, String nodeId, boolean connected, String s
setFlags(flags);
}

RedisClusterNode(RedisURI uri, String nodeId, boolean connected, String slaveOf, long pingSentTimestamp,
long pongReceivedTimestamp, long configEpoch, BitSet slots, Set<NodeFlag> flags) {

this.uri = uri;
this.nodeId = nodeId;
this.connected = connected;
this.slaveOf = slaveOf;
this.pingSentTimestamp = pingSentTimestamp;
this.pongReceivedTimestamp = pongReceivedTimestamp;
this.configEpoch = configEpoch;

this.slots = new BitSet(slots.length());
this.slots.or(slots);

setFlags(flags);
}

public RedisClusterNode(RedisClusterNode redisClusterNode) {

LettuceAssert.notNull(redisClusterNode, "RedisClusterNode must not be null");
Expand Down Expand Up @@ -225,6 +243,28 @@ public List<Integer> getSlots() {
return slots;
}

/**
* Performs the given action for each slot of this {@link RedisClusterNode} until all elements have been processed or the
* action throws an exception. Unless otherwise specified by the implementing class, actions are performed in the order of
* iteration (if an iteration order is specified). Exceptions thrown by the action are relayed to the caller.
*
* @param consumer
* @since 5.2
*/
public void forEachSlot(IntConsumer consumer) {

if (slots == null || slots.isEmpty()) {
return;
}

for (int i = 0; i < this.slots.length(); i++) {

if (this.slots.get(i)) {
consumer.accept(i);
}
}
}

/**
* Sets the list of slots for which this {@link RedisClusterNode} is the
* {@link io.lettuce.core.cluster.models.partitions.RedisClusterNode.NodeFlag#MASTER}. The list is empty if this node is not
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -184,19 +184,22 @@ NodeTopologyViews getNodeSpecificViews(Requests requestedTopology, Requests requ
node.addAlias(nodeUri);
}

List<RedisClusterNodeSnapshot> nodeWithStats = nodeTopologyView.getPartitions() //
.stream() //
.filter(ClusterTopologyRefresh::validNode) //
.map(RedisClusterNodeSnapshot::new).collect(Collectors.toList());
List<RedisClusterNodeSnapshot> nodeWithStats = new ArrayList<>(nodeTopologyView.getPartitions().size());

nodeWithStats.stream() //
.filter(partition -> partition.is(RedisClusterNode.NodeFlag.MYSELF)) //
.forEach(partition -> {
for (RedisClusterNode partition : nodeTopologyView.getPartitions()) {

if (validNode(partition)) {
RedisClusterNodeSnapshot redisClusterNodeSnapshot = new RedisClusterNodeSnapshot(partition);
nodeWithStats.add(redisClusterNodeSnapshot);

if (partition.is(RedisClusterNode.NodeFlag.MYSELF)) {

// record latency for later partition ordering
latencies.put(partition.getNodeId(), nodeTopologyView.getLatency());
clientCountByNodeId.put(partition.getNodeId(), nodeTopologyView.getConnectedClients());
});
latencies.put(partition.getNodeId(), nodeTopologyView.getLatency());
clientCountByNodeId.put(partition.getNodeId(), nodeTopologyView.getConnectedClients());
}
}
}

allNodes.addAll(nodeWithStats);

Expand Down Expand Up @@ -318,8 +321,22 @@ public RedisURI getViewedBy(Map<RedisURI, Partitions> map, Partitions partitions

private static <E> Set<E> difference(Set<E> set1, Set<E> set2) {

Set<E> result = set1.stream().filter(e -> !set2.contains(e)).collect(Collectors.toSet());
result.addAll(set2.stream().filter(e -> !set1.contains(e)).collect(Collectors.toList()));
Set<E> result = new HashSet<>(set1.size());

for (E e1 : set1) {
if (!set2.contains(e1)) {
result.add(e1);
}
}

List<E> list = new ArrayList<>(set2.size());
for (E e : set2) {
if (!set1.contains(e)) {
list.add(e);
}
}

result.addAll(list);

return result;
}
Expand Down

0 comments on commit 135b330

Please sign in to comment.