Skip to content

Commit

Permalink
Reset round robin on RedisClusterNode URI change #1909
Browse files Browse the repository at this point in the history
Original pull request: #1912.
  • Loading branch information
christian.lang authored and mp911de committed Dec 3, 2021
1 parent a0ca383 commit d092385
Show file tree
Hide file tree
Showing 3 changed files with 45 additions and 2 deletions.
25 changes: 24 additions & 1 deletion src/main/java/io/lettuce/core/cluster/RoundRobin.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.function.BiFunction;

/**
* Circular element provider. This class allows infinite scrolling over a collection with the possibility to provide an initial
Expand All @@ -31,6 +32,16 @@ class RoundRobin<V> {

protected volatile V offset;

private final BiFunction<V, V, Boolean> hasElementChanged;

public RoundRobin() {
this((a, b) -> false);
}

public RoundRobin(BiFunction<V, V, Boolean> hasElementChanged) {
this.hasElementChanged = hasElementChanged;
}

/**
* Return whether this {@link RoundRobin} is still consistent and contains all items from the leader {@link Collection} and
* vice versa.
Expand All @@ -42,7 +53,19 @@ public boolean isConsistent(Collection<? extends V> leader) {

Collection<? extends V> collection = this.collection;

return collection.containsAll(leader) && leader.containsAll(collection);
for (V currentElement : collection) {
boolean found = false;
for (V searchedElement : leader) {
if (searchedElement.equals(currentElement) && !hasElementChanged.apply(currentElement, searchedElement)) {
found = true;
}
}
if (!found) {
return false;
}
}

return collection.size() == leader.size();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ public RoundRobinSocketAddressSupplier(Supplier<Partitions> partitions,
LettuceAssert.notNull(sortFunction, "Sort-Function must not be null");

this.partitions = partitions;
this.roundRobin = new RoundRobin<>();
this.roundRobin = new RoundRobin<>((a, b) -> !a.getUri().equals(b.getUri()));
this.sortFunction = (Function) sortFunction;
this.clientResources = clientResources;
resetRoundRobin(partitions.get());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,11 +46,13 @@ class RoundRobinSocketAddressSupplierUnitTests {
private static RedisURI hap2 = new RedisURI("127.0.0.1", 2, Duration.ofSeconds(1));
private static RedisURI hap3 = new RedisURI("127.0.0.1", 3, Duration.ofSeconds(1));
private static RedisURI hap4 = new RedisURI("127.0.0.1", 4, Duration.ofSeconds(1));
private static RedisURI hap5 = new RedisURI("127.0.0.0", 5, Duration.ofSeconds(1));

private static InetSocketAddress addr1 = new InetSocketAddress(hap1.getHost(), hap1.getPort());
private static InetSocketAddress addr2 = new InetSocketAddress(hap2.getHost(), hap2.getPort());
private static InetSocketAddress addr3 = new InetSocketAddress(hap3.getHost(), hap3.getPort());
private static InetSocketAddress addr4 = new InetSocketAddress(hap4.getHost(), hap4.getPort());
private static InetSocketAddress addr5 = new InetSocketAddress(hap5.getHost(), hap5.getPort());

private static Partitions partitions;

Expand Down Expand Up @@ -85,6 +87,24 @@ void noOffset() {
assertThat(sut.get()).isNotEqualTo(addr3);
}

@Test
void nodeIPChanges() {

RoundRobinSocketAddressSupplier sut = new RoundRobinSocketAddressSupplier(() -> partitions,
redisClusterNodes -> redisClusterNodes, clientResourcesMock);

assertThat(sut.get()).isEqualTo(addr1);

assertThat(partitions.remove(new RedisClusterNode(hap1, "2", true, "", 0, 0, 0, new ArrayList<>(), new HashSet<>())))
.isTrue();
assertThat(partitions.add(new RedisClusterNode(hap5, "2", true, "", 0, 0, 0, new ArrayList<>(), new HashSet<>())))
.isTrue();

assertThat(sut.get()).isEqualTo(addr1);
assertThat(sut.get()).isEqualTo(addr3);
assertThat(sut.get()).isEqualTo(addr5);
}

@Test
void partitionTableChangesNewNode() {

Expand Down

0 comments on commit d092385

Please sign in to comment.