Skip to content

Commit

Permalink
Fix command retrigger when closing ClusterNodeEndpoint asynchronously #…
Browse files Browse the repository at this point in the history
…1769

ClusterNodeEndpoint now overrides the correct close method.
  • Loading branch information
mp911de committed Jun 9, 2021
1 parent 31f8635 commit a96e243
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package io.lettuce.core.cluster;

import java.util.Collection;
import java.util.concurrent.CompletableFuture;

import io.lettuce.core.ClientOptions;
import io.lettuce.core.RedisChannelWriter;
Expand Down Expand Up @@ -61,15 +62,15 @@ public ClusterNodeEndpoint(ClientOptions clientOptions, ClientResources clientRe
* and retries on their own.
*/
@Override
public void close() {
public CompletableFuture<Void> closeAsync() {

logger.debug("{} close()", logPrefix());
logger.debug("{} closeAsync()", logPrefix());

if (clusterChannelWriter != null) {
retriggerCommands(doExclusive(this::drainCommands));
}

super.close();
return super.closeAsync();
}

protected void retriggerCommands(Collection<RedisCommand<?, ?, ?>> commands) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,7 @@
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyZeroInteractions;
import static org.mockito.Mockito.when;
import static org.mockito.Mockito.*;

import java.util.Queue;

Expand All @@ -28,7 +26,6 @@
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import io.lettuce.test.ReflectionTestUtils;

import io.lettuce.core.ClientOptions;
import io.lettuce.core.RedisChannelWriter;
Expand All @@ -40,9 +37,12 @@
import io.lettuce.core.protocol.CommandType;
import io.lettuce.core.protocol.RedisCommand;
import io.lettuce.core.resource.ClientResources;
import io.lettuce.test.ReflectionTestUtils;
import io.lettuce.test.TestFutures;

/**
* Unit tests for {@link ClusterNodeEndpoint}.
*
* @author Mark Paluch
*/
@ExtendWith(MockitoExtension.class)
Expand Down Expand Up @@ -76,16 +76,16 @@ void before() {
@Test
void closeWithoutCommands() {

sut.close();
verifyZeroInteractions(clusterChannelWriter);
sut.closeAsync();
verifyNoInteractions(clusterChannelWriter);
}

@Test
void closeWithQueuedCommands() {

disconnectedBuffer.add(command);

sut.close();
sut.closeAsync();

verify(clusterChannelWriter).write(command);
}
Expand All @@ -96,9 +96,9 @@ void closeWithCancelledQueuedCommands() {
disconnectedBuffer.add(command);
command.cancel();

sut.close();
sut.closeAsync();

verifyZeroInteractions(clusterChannelWriter);
verifyNoInteractions(clusterChannelWriter);
}

@Test
Expand All @@ -107,7 +107,7 @@ void closeWithQueuedCommandsFails() {
disconnectedBuffer.add(command);
when(clusterChannelWriter.write(any(RedisCommand.class))).thenThrow(new RedisException("meh"));

sut.close();
sut.closeAsync();

assertThat(command.isDone()).isTrue();

Expand All @@ -122,7 +122,7 @@ void closeWithBufferedCommands() {

sut.write(command);

sut.close();
sut.closeAsync();

verify(clusterChannelWriter).write(command);
}
Expand All @@ -136,9 +136,9 @@ void closeWithCancelledBufferedCommands() {
sut.write(command);
command.cancel();

sut.close();
sut.closeAsync();

verifyZeroInteractions(clusterChannelWriter);
verifyNoInteractions(clusterChannelWriter);
}

@Test
Expand All @@ -150,7 +150,7 @@ void closeWithBufferedCommandsFails() {
sut.write(command);
when(clusterChannelWriter.write(any(RedisCommand.class))).thenThrow(new RedisException(""));

sut.close();
sut.closeAsync();

assertThatThrownBy(() -> TestFutures.awaitOrTimeout(command)).isInstanceOf(RedisException.class);
}
Expand Down

0 comments on commit a96e243

Please sign in to comment.