Skip to content

Commit

Permalink
Make RoundRobin and RoundRobinSocketAddressSupplier threadsafe #663
Browse files Browse the repository at this point in the history
Replace round-robin collection instead of updating it on change.
  • Loading branch information
mp911de committed Dec 11, 2017
1 parent bd0bafc commit cacd1e3
Show file tree
Hide file tree
Showing 3 changed files with 39 additions and 23 deletions.
40 changes: 31 additions & 9 deletions src/main/java/io/lettuce/core/cluster/RoundRobin.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@
*/
package io.lettuce.core.cluster;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;

/**
* Circular element provider. This class allows infinite scrolling over a collection with the possibility to provide an initial
Expand All @@ -25,17 +27,33 @@
*/
class RoundRobin<V> {

protected final Collection<? extends V> collection;
protected volatile Collection<? extends V> collection = Collections.emptyList();

protected V offset;
protected volatile V offset;

public RoundRobin(Collection<? extends V> collection) {
this(collection, null);
/**
* Return whether this {@link RoundRobin} is still consistent and contains all items from the master {@link Collection} and
* vice versa.
*
* @param master the master collection containing source elements for this {@link RoundRobin}.
* @return {@literal true} if this {@link RoundRobin} is consistent with the master {@link Collection}.
*/
public boolean isConsistent(Collection<? extends V> master) {

Collection<? extends V> collection = this.collection;

return collection.containsAll(master) && master.containsAll(collection);
}

public RoundRobin(Collection<? extends V> collection, V offset) {
this.collection = collection;
this.offset = offset;
/**
* Rebuild the {@link RoundRobin} from the master {@link Collection}.
*
* @param master the master collection containing source elements for this {@link RoundRobin}.
*/
public void rebuild(Collection<? extends V> master) {

this.collection = new ArrayList<>(master);
this.offset = null;
}

/**
Expand All @@ -44,6 +62,10 @@ public RoundRobin(Collection<? extends V> collection, V offset) {
* @return the next item
*/
public V next() {

Collection<? extends V> collection = this.collection;
V offset = this.offset;

if (offset != null) {
boolean accept = false;
for (V element : collection) {
Expand All @@ -53,11 +75,11 @@ public V next() {
}

if (accept) {
return offset = element;
return this.offset = element;
}
}
}

return offset = collection.iterator().next();
return this.offset = collection.iterator().next();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
package io.lettuce.core.cluster;

import java.net.SocketAddress;
import java.util.ArrayList;
import java.util.Collection;
import java.util.function.Function;
import java.util.function.Supplier;
Expand All @@ -39,11 +38,10 @@ class RoundRobinSocketAddressSupplier implements Supplier<SocketAddress> {
private static final InternalLogger logger = InternalLoggerFactory.getInstance(RoundRobinSocketAddressSupplier.class);

private final Collection<RedisClusterNode> partitions;
private final Collection<RedisClusterNode> clusterNodes = new ArrayList<>();
private final Function<Collection<RedisClusterNode>, Collection<RedisClusterNode>> sortFunction;
private final ClientResources clientResources;

private RoundRobin<? extends RedisClusterNode> roundRobin;
private RoundRobin<RedisClusterNode> roundRobin;

public RoundRobinSocketAddressSupplier(Collection<RedisClusterNode> partitions,
Function<? extends Collection<RedisClusterNode>, Collection<RedisClusterNode>> sortFunction,
Expand All @@ -53,8 +51,7 @@ public RoundRobinSocketAddressSupplier(Collection<RedisClusterNode> partitions,
LettuceAssert.notNull(sortFunction, "Sort-Function must not be null");

this.partitions = partitions;
this.clusterNodes.addAll(partitions);
this.roundRobin = new RoundRobin<>(clusterNodes);
this.roundRobin = new RoundRobin<>();
this.sortFunction = (Function) sortFunction;
this.clientResources = clientResources;
resetRoundRobin();
Expand All @@ -63,7 +60,7 @@ public RoundRobinSocketAddressSupplier(Collection<RedisClusterNode> partitions,
@Override
public SocketAddress get() {

if (!clusterNodes.containsAll(partitions) || !partitions.containsAll(clusterNodes)) {
if (!roundRobin.isConsistent(partitions)) {
resetRoundRobin();
}

Expand All @@ -72,10 +69,7 @@ public SocketAddress get() {
}

protected void resetRoundRobin() {

clusterNodes.clear();
clusterNodes.addAll(sortFunction.apply(partitions));
roundRobin.offset = null;
roundRobin.rebuild(sortFunction.apply(partitions));
}

protected SocketAddress getSocketAddress(RedisClusterNode redisClusterNode) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ public class RoundRobinSocketAddressSupplierTest {
private ClientResources clientResourcesMock;

@BeforeClass
public static void beforeClass() throws Exception {
public static void beforeClass() {

hap1.getResolvedAddress();
hap2.getResolvedAddress();
Expand All @@ -72,7 +72,7 @@ public void before() throws Exception {
}

@Test
public void noOffset() throws Exception {
public void noOffset() {

RoundRobinSocketAddressSupplier sut = new RoundRobinSocketAddressSupplier(partitions,
redisClusterNodes -> redisClusterNodes, clientResourcesMock);
Expand All @@ -86,7 +86,7 @@ public void noOffset() throws Exception {
}

@Test
public void partitionTableChangesNewNode() throws Exception {
public void partitionTableChangesNewNode() {

RoundRobinSocketAddressSupplier sut = new RoundRobinSocketAddressSupplier(partitions,
redisClusterNodes -> redisClusterNodes, clientResourcesMock);
Expand All @@ -104,7 +104,7 @@ public void partitionTableChangesNewNode() throws Exception {
}

@Test
public void partitionTableChangesNodeRemoved() throws Exception {
public void partitionTableChangesNodeRemoved() {

RoundRobinSocketAddressSupplier sut = new RoundRobinSocketAddressSupplier(partitions,
redisClusterNodes -> redisClusterNodes, clientResourcesMock);
Expand Down

0 comments on commit cacd1e3

Please sign in to comment.