Skip to content

Commit

Permalink
Fix connection leak in cluster topology refresh implementation #721
Browse files Browse the repository at this point in the history
Make sure topology/Connections class closes all referenced connections when close() is called.

Original pull request: #722.
  • Loading branch information
Christian Weitendorf authored and mp911de committed Mar 6, 2018
1 parent 5e53f36 commit c6ac7f1
Show file tree
Hide file tree
Showing 3 changed files with 52 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -147,15 +147,16 @@ private boolean hasConnections() {

private Collection<StatefulRedisConnection<String, String>> drainConnections() {

Map<RedisURI, StatefulRedisConnection<String, String>> drainedConnections;
synchronized (this.connections) {

Map<RedisURI, StatefulRedisConnection<String, String>> connections = new HashMap<>(this.connections);
connections.forEach((k, v) -> {
drainedConnections = new HashMap<>(this.connections);
drainedConnections.forEach((k, v) -> {
this.connections.remove(k);
});
}

return connections.values();
return drainedConnections.values();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -294,6 +294,22 @@ public void shouldNotFailOnDuplicateSeedNodes() {
verify(nodeConnectionFactory).connectToNodeAsync(any(RedisCodec.class), eq(new InetSocketAddress("127.0.0.1", 7381)));
}

@Test
public void shouldCloseConnections() {

List<RedisURI> seed = Arrays.asList(RedisURI.create("127.0.0.1", 7380), RedisURI.create("127.0.0.1", 7381));

when(nodeConnectionFactory.connectToNodeAsync(any(RedisCodec.class), eq(new InetSocketAddress("127.0.0.1", 7380))))
.thenReturn(completedFuture((StatefulRedisConnection) connection1));
when(nodeConnectionFactory.connectToNodeAsync(any(RedisCodec.class), eq(new InetSocketAddress("127.0.0.1", 7381))))
.thenReturn(completedFuture((StatefulRedisConnection) connection2));

sut.loadViews(seed, true);

verify(connection1).close();
verify(connection2).close();
}

@Test
public void undiscoveredAdditionalNodesShouldBeLastUsingClientCount() {

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package io.lettuce.core.cluster.topology;

import static org.mockito.Mockito.verify;

import io.lettuce.core.RedisURI;
import io.lettuce.core.api.StatefulRedisConnection;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.junit.MockitoJUnitRunner;

@RunWith(MockitoJUnitRunner.class)
public class ConnectionsTest {

@Mock
private StatefulRedisConnection<String, String> connection1;

@Mock
private StatefulRedisConnection<String, String> connection2;

@Test
public void shouldCloseAllConnections() {
final Connections iut = new Connections();
iut.addConnection(RedisURI.create("127.0.0.1", 7380), connection1);
iut.addConnection(RedisURI.create("127.0.0.1", 7381), connection2);

iut.close();

verify(connection1).close();
verify(connection2).close();
}
}

0 comments on commit c6ac7f1

Please sign in to comment.