From a96e243174fd2b0c3bf6cd2863b294b8a7c0235f Mon Sep 17 00:00:00 2001 From: Mark Paluch Date: Wed, 9 Jun 2021 09:18:33 +0200 Subject: [PATCH] Fix command retrigger when closing ClusterNodeEndpoint asynchronously #1769 ClusterNodeEndpoint now overrides the correct close method. --- .../core/cluster/ClusterNodeEndpoint.java | 7 +++-- .../cluster/ClusterNodeEndpointUnitTests.java | 28 +++++++++---------- 2 files changed, 18 insertions(+), 17 deletions(-) diff --git a/src/main/java/io/lettuce/core/cluster/ClusterNodeEndpoint.java b/src/main/java/io/lettuce/core/cluster/ClusterNodeEndpoint.java index 19f731f5ab..8933683f5c 100644 --- a/src/main/java/io/lettuce/core/cluster/ClusterNodeEndpoint.java +++ b/src/main/java/io/lettuce/core/cluster/ClusterNodeEndpoint.java @@ -16,6 +16,7 @@ package io.lettuce.core.cluster; import java.util.Collection; +import java.util.concurrent.CompletableFuture; import io.lettuce.core.ClientOptions; import io.lettuce.core.RedisChannelWriter; @@ -61,15 +62,15 @@ public ClusterNodeEndpoint(ClientOptions clientOptions, ClientResources clientRe * and retries on their own. */ @Override - public void close() { + public CompletableFuture closeAsync() { - logger.debug("{} close()", logPrefix()); + logger.debug("{} closeAsync()", logPrefix()); if (clusterChannelWriter != null) { retriggerCommands(doExclusive(this::drainCommands)); } - super.close(); + return super.closeAsync(); } protected void retriggerCommands(Collection> commands) { diff --git a/src/test/java/io/lettuce/core/cluster/ClusterNodeEndpointUnitTests.java b/src/test/java/io/lettuce/core/cluster/ClusterNodeEndpointUnitTests.java index b15b159395..b17bc23278 100644 --- a/src/test/java/io/lettuce/core/cluster/ClusterNodeEndpointUnitTests.java +++ b/src/test/java/io/lettuce/core/cluster/ClusterNodeEndpointUnitTests.java @@ -17,9 +17,7 @@ import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.assertj.core.api.AssertionsForClassTypes.assertThat; import static org.mockito.Matchers.any; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.verifyZeroInteractions; -import static org.mockito.Mockito.when; +import static org.mockito.Mockito.*; import java.util.Queue; @@ -28,7 +26,6 @@ import org.junit.jupiter.api.extension.ExtendWith; import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; -import io.lettuce.test.ReflectionTestUtils; import io.lettuce.core.ClientOptions; import io.lettuce.core.RedisChannelWriter; @@ -40,9 +37,12 @@ import io.lettuce.core.protocol.CommandType; import io.lettuce.core.protocol.RedisCommand; import io.lettuce.core.resource.ClientResources; +import io.lettuce.test.ReflectionTestUtils; import io.lettuce.test.TestFutures; /** + * Unit tests for {@link ClusterNodeEndpoint}. + * * @author Mark Paluch */ @ExtendWith(MockitoExtension.class) @@ -76,8 +76,8 @@ void before() { @Test void closeWithoutCommands() { - sut.close(); - verifyZeroInteractions(clusterChannelWriter); + sut.closeAsync(); + verifyNoInteractions(clusterChannelWriter); } @Test @@ -85,7 +85,7 @@ void closeWithQueuedCommands() { disconnectedBuffer.add(command); - sut.close(); + sut.closeAsync(); verify(clusterChannelWriter).write(command); } @@ -96,9 +96,9 @@ void closeWithCancelledQueuedCommands() { disconnectedBuffer.add(command); command.cancel(); - sut.close(); + sut.closeAsync(); - verifyZeroInteractions(clusterChannelWriter); + verifyNoInteractions(clusterChannelWriter); } @Test @@ -107,7 +107,7 @@ void closeWithQueuedCommandsFails() { disconnectedBuffer.add(command); when(clusterChannelWriter.write(any(RedisCommand.class))).thenThrow(new RedisException("meh")); - sut.close(); + sut.closeAsync(); assertThat(command.isDone()).isTrue(); @@ -122,7 +122,7 @@ void closeWithBufferedCommands() { sut.write(command); - sut.close(); + sut.closeAsync(); verify(clusterChannelWriter).write(command); } @@ -136,9 +136,9 @@ void closeWithCancelledBufferedCommands() { sut.write(command); command.cancel(); - sut.close(); + sut.closeAsync(); - verifyZeroInteractions(clusterChannelWriter); + verifyNoInteractions(clusterChannelWriter); } @Test @@ -150,7 +150,7 @@ void closeWithBufferedCommandsFails() { sut.write(command); when(clusterChannelWriter.write(any(RedisCommand.class))).thenThrow(new RedisException("")); - sut.close(); + sut.closeAsync(); assertThatThrownBy(() -> TestFutures.awaitOrTimeout(command)).isInstanceOf(RedisException.class); }