Skip to content

Commit

Permalink
Cancel overcommitted commands #616
Browse files Browse the repository at this point in the history
Lettuce now cancels overcommitted commands that exceed the queue size. Overcommitted commands can happen if commands were written to the protocol stack and a disconnect attempts to copy these commands to a smaller disconnected buffer.
  • Loading branch information
mp911de committed Oct 6, 2017
1 parent bd073d6 commit 1ed5403
Show file tree
Hide file tree
Showing 3 changed files with 113 additions and 23 deletions.
22 changes: 19 additions & 3 deletions src/main/java/io/lettuce/core/protocol/DefaultEndpoint.java
Original file line number Diff line number Diff line change
Expand Up @@ -529,17 +529,33 @@ public void notifyDrainQueuedCommands(HasQueuedCommands queuedCommands) {

Collection<RedisCommand<?, ?, ?>> commands = queuedCommands.drainQueue();

if (debugEnabled) {
logger.debug("{} notifyQueuedCommands adding {} command(s) to buffer", logPrefix(), commands.size());
}

commands.addAll(drainCommands(disconnectedBuffer));

for (RedisCommand<?, ?, ?> command : commands) {

if (command instanceof DemandAware.Sink) {
((DemandAware.Sink) command).removeSource();
}
}

logger.debug("{} notifyQueuedCommands adding {} command(s) to buffer", logPrefix(), commands.size());
try {
disconnectedBuffer.addAll(commands);
} catch (RuntimeException e) {

if (debugEnabled) {
logger.debug("{} notifyQueuedCommands Queue overcommit. Cannot add all commands to buffer (disconnected).",
logPrefix(), commands.size());
}
commands.removeAll(disconnectedBuffer);

commands.addAll(drainCommands(disconnectedBuffer));
disconnectedBuffer.addAll(commands);
for (RedisCommand<?, ?, ?> command : commands) {
command.completeExceptionally(e);
}
}

if (isConnected()) {
flushCommands(disconnectedBuffer);
Expand Down
4 changes: 3 additions & 1 deletion src/test/java/io/lettuce/core/AbstractRedisClientTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -82,14 +82,16 @@ public void closeConnection() throws Exception {
public abstract class WithPasswordRequired {
protected abstract void run(RedisClient client) throws Exception;

public WithPasswordRequired() throws Exception {
public WithPasswordRequired() {
try {
redis.configSet("requirepass", passwd);
redis.auth(passwd);

RedisClient client = newRedisClient();
try {
run(client);
} catch (Exception e) {
throw new IllegalStateException(e);
} finally {
FastShutdown.shutdown(client);
}
Expand Down
110 changes: 91 additions & 19 deletions src/test/java/io/lettuce/core/ClientOptionsTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,16 @@

import static io.lettuce.ConnectionTestUtil.getChannel;
import static io.lettuce.ConnectionTestUtil.getConnectionWatchdog;
import static io.lettuce.ConnectionTestUtil.getStack;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.assertj.core.api.Assertions.fail;

import java.net.ServerSocket;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.TimeUnit;

import org.junit.Test;
Expand All @@ -30,6 +35,7 @@
import io.lettuce.core.api.StatefulRedisConnection;
import io.lettuce.core.api.async.RedisAsyncCommands;
import io.lettuce.core.api.sync.RedisCommands;
import io.lettuce.core.codec.StringCodec;
import io.lettuce.core.codec.Utf8StringCodec;
import io.lettuce.core.output.StatusOutput;
import io.lettuce.core.protocol.*;
Expand All @@ -41,17 +47,17 @@
public class ClientOptionsTest extends AbstractRedisClientTest {

@Test
public void testNew() throws Exception {
public void testNew() {
checkAssertions(ClientOptions.create());
}

@Test
public void testBuilder() throws Exception {
public void testBuilder() {
checkAssertions(ClientOptions.builder().build());
}

@Test
public void testCopy() throws Exception {
public void testCopy() {
checkAssertions(ClientOptions.copyOf(ClientOptions.builder().build()));
}

Expand All @@ -64,7 +70,7 @@ protected void checkAssertions(ClientOptions sut) {
}

@Test
public void variousClientOptions() throws Exception {
public void variousClientOptions() {

StatefulRedisConnection<String, String> connection1 = client.connect();

Expand All @@ -83,7 +89,7 @@ public void variousClientOptions() throws Exception {
}

@Test
public void requestQueueSize() throws Exception {
public void requestQueueSize() {

client.setOptions(ClientOptions.builder().requestQueueSize(10).build());

Expand All @@ -109,7 +115,73 @@ public void requestQueueSize() throws Exception {
}

@Test
public void disconnectedWithoutReconnect() throws Exception {
public void requestQueueSizeAppliedForReconnect() {

client.setOptions(ClientOptions.builder().requestQueueSize(10).build());

RedisAsyncCommands<String, String> connection = client.connect().async();
ConnectionWatchdog watchdog = getConnectionWatchdog(connection.getStatefulConnection());

watchdog.setListenOnChannelInactive(false);

connection.quit();

Wait.untilTrue(() -> !connection.getStatefulConnection().isOpen()).waitOrTimeout();

List<RedisFuture<String>> pings = new ArrayList<>();
for (int i = 0; i < 10; i++) {
pings.add(connection.ping());
}

watchdog.setListenOnChannelInactive(true);
watchdog.scheduleReconnect();

for (RedisFuture<String> ping : pings) {
assertThat(ping.toCompletableFuture().join()).isEqualTo("PONG");
}

connection.getStatefulConnection().close();
}

@Test
public void requestQueueSizeOvercommittedReconnect() throws Exception {

client.setOptions(ClientOptions.builder().requestQueueSize(10).build());

StatefulRedisConnection<String, String> connection = client.connect();
ConnectionWatchdog watchdog = getConnectionWatchdog(connection);

watchdog.setListenOnChannelInactive(false);

Queue<Object> buffer = getStack(connection);
List<RedisFuture<String>> pings = new ArrayList<>();
for (int i = 0; i < 11; i++) {

AsyncCommand<String, String, String> command = new AsyncCommand<>(new Command<>(CommandType.PING,
new StatusOutput<>(StringCodec.UTF8)));
pings.add(command);
buffer.add(command);
}

getChannel(connection).disconnect();

Wait.untilTrue(() -> !connection.isOpen()).waitOrTimeout();

watchdog.setListenOnChannelInactive(true);
watchdog.scheduleReconnect();

for (int i = 0; i < 10; i++) {
assertThat(pings.get(i).get()).isEqualTo("PONG");
}

assertThatThrownBy(() -> pings.get(10).toCompletableFuture().join()).hasCauseInstanceOf(IllegalStateException.class)
.hasMessage("java.lang.IllegalStateException: Queue full");

connection.close();
}

@Test
public void disconnectedWithoutReconnect() {

client.setOptions(ClientOptions.builder().autoReconnect(false).build());

Expand All @@ -127,7 +199,7 @@ public void disconnectedWithoutReconnect() throws Exception {
}

@Test
public void disconnectedRejectCommands() throws Exception {
public void disconnectedRejectCommands() {

client.setOptions(ClientOptions.builder().disconnectedBehavior(ClientOptions.DisconnectedBehavior.REJECT_COMMANDS)
.build());
Expand All @@ -147,7 +219,7 @@ public void disconnectedRejectCommands() throws Exception {
}

@Test
public void disconnectedAcceptCommands() throws Exception {
public void disconnectedAcceptCommands() {

client.setOptions(ClientOptions.builder().autoReconnect(false)
.disconnectedBehavior(ClientOptions.DisconnectedBehavior.ACCEPT_COMMANDS).build());
Expand All @@ -161,7 +233,7 @@ public void disconnectedAcceptCommands() throws Exception {
}

@Test(timeout = 10000)
public void pingBeforeConnect() throws Exception {
public void pingBeforeConnect() {

redis.set(key, value);
client.setOptions(ClientOptions.builder().pingBeforeActivateConnection(true).build());
Expand Down Expand Up @@ -196,11 +268,11 @@ public void pingBeforeConnectTimeout() throws Exception {
}

@Test
public void pingBeforeConnectWithAuthentication() throws Exception {
public void pingBeforeConnectWithAuthentication() {

new WithPasswordRequired() {
@Override
protected void run(RedisClient client) throws Exception {
protected void run(RedisClient client) {

client.setOptions(ClientOptions.builder().pingBeforeActivateConnection(true).build());
RedisURI redisURI = RedisURI.Builder.redis(host, port).withPassword(passwd).build();
Expand All @@ -219,7 +291,7 @@ protected void run(RedisClient client) throws Exception {
}

@Test(timeout = 2000)
public void pingBeforeConnectWithAuthenticationTimeout() throws Exception {
public void pingBeforeConnectWithAuthenticationTimeout() {

new WithPasswordRequired() {
@Override
Expand All @@ -245,11 +317,11 @@ protected void run(RedisClient client) throws Exception {
}

@Test
public void pingBeforeConnectWithSslAndAuthentication() throws Exception {
public void pingBeforeConnectWithSslAndAuthentication() {

new WithPasswordRequired() {
@Override
protected void run(RedisClient client) throws Exception {
protected void run(RedisClient client) {

client.setOptions(ClientOptions.builder().pingBeforeActivateConnection(true).build());
RedisURI redisURI = RedisURI.Builder.redis(host, 6443).withPassword(passwd).withVerifyPeer(false).withSsl(true)
Expand All @@ -269,11 +341,11 @@ protected void run(RedisClient client) throws Exception {
}

@Test
public void pingBeforeConnectWithAuthenticationFails() throws Exception {
public void pingBeforeConnectWithAuthenticationFails() {

new WithPasswordRequired() {
@Override
protected void run(RedisClient client) throws Exception {
protected void run(RedisClient client) {

client.setOptions(ClientOptions.builder().pingBeforeActivateConnection(true).build());
RedisURI redisURI = RedisURI.builder().redis(host, port).build();
Expand All @@ -289,11 +361,11 @@ protected void run(RedisClient client) throws Exception {
}

@Test
public void pingBeforeConnectWithSslAndAuthenticationFails() throws Exception {
public void pingBeforeConnectWithSslAndAuthenticationFails() {

new WithPasswordRequired() {
@Override
protected void run(RedisClient client) throws Exception {
protected void run(RedisClient client) {

client.setOptions(ClientOptions.builder().pingBeforeActivateConnection(true).build());
RedisURI redisURI = RedisURI.builder().redis(host, 6443).withVerifyPeer(false).withSsl(true).build();
Expand Down Expand Up @@ -351,7 +423,7 @@ public void pingBeforeConnectWithQueuedCommandsAndReconnect() throws Exception {
}

@Test(timeout = 10000)
public void authenticatedPingBeforeConnectWithQueuedCommandsAndReconnect() throws Exception {
public void authenticatedPingBeforeConnectWithQueuedCommandsAndReconnect() {

new WithPasswordRequired() {

Expand Down

0 comments on commit 1ed5403

Please sign in to comment.