diff --git a/src/main/java/io/lettuce/core/masterslave/Connections.java b/src/main/java/io/lettuce/core/masterslave/Connections.java index 6a5b68843c..317941a89f 100644 --- a/src/main/java/io/lettuce/core/masterslave/Connections.java +++ b/src/main/java/io/lettuce/core/masterslave/Connections.java @@ -55,6 +55,8 @@ class Connections extends CompletableEventLatchSupport exceptions = new CopyOnWriteArrayList<>(); private final List nodes; + private volatile boolean closed = false; + public Connections(int expectedConnectionCount, List nodes) { super(expectedConnectionCount); this.nodes = nodes; @@ -63,14 +65,19 @@ public Connections(int expectedConnectionCount, List nodes @Override protected void onAccept(Tuple2> value) { - synchronized (connections) { - connections.put(value.getT1(), value.getT2()); + if (this.closed) { + value.getT2().closeAsync(); + return; + } + + synchronized (this.connections) { + this.connections.put(value.getT1(), value.getT2()); } } @Override protected void onError(Throwable value) { - exceptions.add(value); + this.exceptions.add(value); } @Override @@ -86,11 +93,11 @@ protected void onDrop(Throwable value) { @Override protected void onEmit(Emission emission) { - if (getExpectedCount() != 0 && connections.isEmpty() && !exceptions.isEmpty()) { + if (getExpectedCount() != 0 && this.connections.isEmpty() && !this.exceptions.isEmpty()) { RedisConnectionException collector = new RedisConnectionException( "Unable to establish a connection to Redis Cluster"); - exceptions.forEach(collector::addSuppressed); + this.exceptions.forEach(collector::addSuppressed); emission.error(collector); } else { @@ -102,8 +109,8 @@ protected void onEmit(Emission emission) { * @return {@literal true} if no connections present. */ public boolean isEmpty() { - synchronized (connections) { - return connections.isEmpty(); + synchronized (this.connections) { + return this.connections.isEmpty(); } } @@ -114,8 +121,9 @@ public boolean isEmpty() { */ public Requests requestPing() { - Set>> entries = new LinkedHashSet<>(connections.entrySet()); - Requests requests = new Requests(entries.size(), nodes); + Set>> entries = new LinkedHashSet<>( + this.connections.entrySet()); + Requests requests = new Requests(entries.size(), this.nodes); for (Map.Entry> entry : entries) { @@ -136,17 +144,19 @@ public Requests requestPing() { */ public CompletableFuture closeAsync() { - List close = new ArrayList<>(connections.size()); - List toRemove = new ArrayList<>(connections.size()); + List close = new ArrayList<>(this.connections.size()); + List toRemove = new ArrayList<>(this.connections.size()); + + this.closed = true; - for (Map.Entry> entry : connections.entrySet()) { + for (Map.Entry> entry : this.connections.entrySet()) { toRemove.add(entry.getKey()); close.add(entry.getValue().closeAsync()); } for (RedisURI redisURI : toRemove) { - connections.remove(redisURI); + this.connections.remove(redisURI); } return CompletableFuture.allOf(close.toArray(new CompletableFuture[0])); diff --git a/src/test/java/io/lettuce/core/masterslave/ConnectionsTest.java b/src/test/java/io/lettuce/core/masterslave/ConnectionsTest.java new file mode 100644 index 0000000000..c27642b24f --- /dev/null +++ b/src/test/java/io/lettuce/core/masterslave/ConnectionsTest.java @@ -0,0 +1,61 @@ +/* + * Copyright 2018 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.lettuce.core.masterslave; + +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyZeroInteractions; +import static org.mockito.Mockito.when; + +import java.util.Collections; +import java.util.concurrent.CompletableFuture; + +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.junit.MockitoJUnitRunner; + +import reactor.util.function.Tuples; +import io.lettuce.core.RedisURI; +import io.lettuce.core.api.StatefulRedisConnection; + +/** + * @author Mark Paluch + */ +@RunWith(MockitoJUnitRunner.class) +public class ConnectionsTest { + + @Mock + private StatefulRedisConnection connection1; + + @Before + public void before() { + when(connection1.closeAsync()).thenReturn(CompletableFuture.completedFuture(null)); + } + + @Test + public void shouldCloseConnectionCompletingAfterCloseSignal() { + + Connections connections = new Connections(5, Collections.emptyList()); + connections.closeAsync(); + + verifyZeroInteractions(connection1); + + connections.onAccept(Tuples.of(RedisURI.create("localhost", 6379), connection1)); + + verify(connection1).closeAsync(); + } +}