Skip to content

Commit

Permalink
Ensure Master/Slave topology refresh connections are closed after tim…
Browse files Browse the repository at this point in the history
…eout #723

Lettuce now closes Master/Slave topology refresh connections if connections complete the connect progress after the actual refresh finishes. This could happen if a connection finished its connect (slow connect) after the synchronization timeout and after all topology views were obtained so the process already ran its cleanup.
  • Loading branch information
mp911de committed Mar 6, 2018
1 parent 3b72614 commit 0ae34d4
Show file tree
Hide file tree
Showing 2 changed files with 84 additions and 13 deletions.
36 changes: 23 additions & 13 deletions src/main/java/io/lettuce/core/masterslave/Connections.java
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ class Connections extends CompletableEventLatchSupport<Tuple2<RedisURI, Stateful
private final List<Throwable> exceptions = new CopyOnWriteArrayList<>();
private final List<RedisNodeDescription> nodes;

private volatile boolean closed = false;

public Connections(int expectedConnectionCount, List<RedisNodeDescription> nodes) {
super(expectedConnectionCount);
this.nodes = nodes;
Expand All @@ -63,14 +65,19 @@ public Connections(int expectedConnectionCount, List<RedisNodeDescription> nodes
@Override
protected void onAccept(Tuple2<RedisURI, StatefulRedisConnection<String, String>> value) {

synchronized (connections) {
connections.put(value.getT1(), value.getT2());
if (this.closed) {
value.getT2().closeAsync();
return;
}

synchronized (this.connections) {
this.connections.put(value.getT1(), value.getT2());
}
}

@Override
protected void onError(Throwable value) {
exceptions.add(value);
this.exceptions.add(value);
}

@Override
Expand All @@ -86,11 +93,11 @@ protected void onDrop(Throwable value) {
@Override
protected void onEmit(Emission<Connections> emission) {

if (getExpectedCount() != 0 && connections.isEmpty() && !exceptions.isEmpty()) {
if (getExpectedCount() != 0 && this.connections.isEmpty() && !this.exceptions.isEmpty()) {

RedisConnectionException collector = new RedisConnectionException(
"Unable to establish a connection to Redis Cluster");
exceptions.forEach(collector::addSuppressed);
this.exceptions.forEach(collector::addSuppressed);

emission.error(collector);
} else {
Expand All @@ -102,8 +109,8 @@ protected void onEmit(Emission<Connections> emission) {
* @return {@literal true} if no connections present.
*/
public boolean isEmpty() {
synchronized (connections) {
return connections.isEmpty();
synchronized (this.connections) {
return this.connections.isEmpty();
}
}

Expand All @@ -114,8 +121,9 @@ public boolean isEmpty() {
*/
public Requests requestPing() {

Set<Map.Entry<RedisURI, StatefulRedisConnection<String, String>>> entries = new LinkedHashSet<>(connections.entrySet());
Requests requests = new Requests(entries.size(), nodes);
Set<Map.Entry<RedisURI, StatefulRedisConnection<String, String>>> entries = new LinkedHashSet<>(
this.connections.entrySet());
Requests requests = new Requests(entries.size(), this.nodes);

for (Map.Entry<RedisURI, StatefulRedisConnection<String, String>> entry : entries) {

Expand All @@ -136,17 +144,19 @@ public Requests requestPing() {
*/
public CompletableFuture<Void> closeAsync() {

List<CompletableFuture> close = new ArrayList<>(connections.size());
List<RedisURI> toRemove = new ArrayList<>(connections.size());
List<CompletableFuture> close = new ArrayList<>(this.connections.size());
List<RedisURI> toRemove = new ArrayList<>(this.connections.size());

this.closed = true;

for (Map.Entry<RedisURI, StatefulRedisConnection<String, String>> entry : connections.entrySet()) {
for (Map.Entry<RedisURI, StatefulRedisConnection<String, String>> entry : this.connections.entrySet()) {

toRemove.add(entry.getKey());
close.add(entry.getValue().closeAsync());
}

for (RedisURI redisURI : toRemove) {
connections.remove(redisURI);
this.connections.remove(redisURI);
}

return CompletableFuture.allOf(close.toArray(new CompletableFuture[0]));
Expand Down
61 changes: 61 additions & 0 deletions src/test/java/io/lettuce/core/masterslave/ConnectionsTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/*
* Copyright 2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.lettuce.core.masterslave;

import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyZeroInteractions;
import static org.mockito.Mockito.when;

import java.util.Collections;
import java.util.concurrent.CompletableFuture;

import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.junit.MockitoJUnitRunner;

import reactor.util.function.Tuples;
import io.lettuce.core.RedisURI;
import io.lettuce.core.api.StatefulRedisConnection;

/**
* @author Mark Paluch
*/
@RunWith(MockitoJUnitRunner.class)
public class ConnectionsTest {

@Mock
private StatefulRedisConnection<String, String> connection1;

@Before
public void before() {
when(connection1.closeAsync()).thenReturn(CompletableFuture.completedFuture(null));
}

@Test
public void shouldCloseConnectionCompletingAfterCloseSignal() {

Connections connections = new Connections(5, Collections.emptyList());
connections.closeAsync();

verifyZeroInteractions(connection1);

connections.onAccept(Tuples.of(RedisURI.create("localhost", 6379), connection1));

verify(connection1).closeAsync();
}
}

0 comments on commit 0ae34d4

Please sign in to comment.