From 1ed5403131c84067a38291777d02b7ad8c0d0a7b Mon Sep 17 00:00:00 2001 From: Mark Paluch Date: Fri, 6 Oct 2017 10:30:23 +0200 Subject: [PATCH] Cancel overcommitted commands #616 Lettuce now cancels overcommitted commands that exceed the queue size. Overcommitted commands can happen if commands were written to the protocol stack and a disconnect attempts to copy these commands to a smaller disconnected buffer. --- .../core/protocol/DefaultEndpoint.java | 22 +++- .../lettuce/core/AbstractRedisClientTest.java | 4 +- .../io/lettuce/core/ClientOptionsTest.java | 110 +++++++++++++++--- 3 files changed, 113 insertions(+), 23 deletions(-) diff --git a/src/main/java/io/lettuce/core/protocol/DefaultEndpoint.java b/src/main/java/io/lettuce/core/protocol/DefaultEndpoint.java index 8193adb07f..338d305b7b 100644 --- a/src/main/java/io/lettuce/core/protocol/DefaultEndpoint.java +++ b/src/main/java/io/lettuce/core/protocol/DefaultEndpoint.java @@ -529,6 +529,12 @@ public void notifyDrainQueuedCommands(HasQueuedCommands queuedCommands) { Collection> commands = queuedCommands.drainQueue(); + if (debugEnabled) { + logger.debug("{} notifyQueuedCommands adding {} command(s) to buffer", logPrefix(), commands.size()); + } + + commands.addAll(drainCommands(disconnectedBuffer)); + for (RedisCommand command : commands) { if (command instanceof DemandAware.Sink) { @@ -536,10 +542,20 @@ public void notifyDrainQueuedCommands(HasQueuedCommands queuedCommands) { } } - logger.debug("{} notifyQueuedCommands adding {} command(s) to buffer", logPrefix(), commands.size()); + try { + disconnectedBuffer.addAll(commands); + } catch (RuntimeException e) { + + if (debugEnabled) { + logger.debug("{} notifyQueuedCommands Queue overcommit. Cannot add all commands to buffer (disconnected).", + logPrefix(), commands.size()); + } + commands.removeAll(disconnectedBuffer); - commands.addAll(drainCommands(disconnectedBuffer)); - disconnectedBuffer.addAll(commands); + for (RedisCommand command : commands) { + command.completeExceptionally(e); + } + } if (isConnected()) { flushCommands(disconnectedBuffer); diff --git a/src/test/java/io/lettuce/core/AbstractRedisClientTest.java b/src/test/java/io/lettuce/core/AbstractRedisClientTest.java index 3343eb58e5..96f3fd71c8 100644 --- a/src/test/java/io/lettuce/core/AbstractRedisClientTest.java +++ b/src/test/java/io/lettuce/core/AbstractRedisClientTest.java @@ -82,7 +82,7 @@ public void closeConnection() throws Exception { public abstract class WithPasswordRequired { protected abstract void run(RedisClient client) throws Exception; - public WithPasswordRequired() throws Exception { + public WithPasswordRequired() { try { redis.configSet("requirepass", passwd); redis.auth(passwd); @@ -90,6 +90,8 @@ public WithPasswordRequired() throws Exception { RedisClient client = newRedisClient(); try { run(client); + } catch (Exception e) { + throw new IllegalStateException(e); } finally { FastShutdown.shutdown(client); } diff --git a/src/test/java/io/lettuce/core/ClientOptionsTest.java b/src/test/java/io/lettuce/core/ClientOptionsTest.java index 6e56a84b84..88a154215c 100644 --- a/src/test/java/io/lettuce/core/ClientOptionsTest.java +++ b/src/test/java/io/lettuce/core/ClientOptionsTest.java @@ -17,11 +17,16 @@ import static io.lettuce.ConnectionTestUtil.getChannel; import static io.lettuce.ConnectionTestUtil.getConnectionWatchdog; +import static io.lettuce.ConnectionTestUtil.getStack; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.assertj.core.api.Assertions.fail; import java.net.ServerSocket; import java.time.Duration; +import java.util.ArrayList; +import java.util.List; +import java.util.Queue; import java.util.concurrent.TimeUnit; import org.junit.Test; @@ -30,6 +35,7 @@ import io.lettuce.core.api.StatefulRedisConnection; import io.lettuce.core.api.async.RedisAsyncCommands; import io.lettuce.core.api.sync.RedisCommands; +import io.lettuce.core.codec.StringCodec; import io.lettuce.core.codec.Utf8StringCodec; import io.lettuce.core.output.StatusOutput; import io.lettuce.core.protocol.*; @@ -41,17 +47,17 @@ public class ClientOptionsTest extends AbstractRedisClientTest { @Test - public void testNew() throws Exception { + public void testNew() { checkAssertions(ClientOptions.create()); } @Test - public void testBuilder() throws Exception { + public void testBuilder() { checkAssertions(ClientOptions.builder().build()); } @Test - public void testCopy() throws Exception { + public void testCopy() { checkAssertions(ClientOptions.copyOf(ClientOptions.builder().build())); } @@ -64,7 +70,7 @@ protected void checkAssertions(ClientOptions sut) { } @Test - public void variousClientOptions() throws Exception { + public void variousClientOptions() { StatefulRedisConnection connection1 = client.connect(); @@ -83,7 +89,7 @@ public void variousClientOptions() throws Exception { } @Test - public void requestQueueSize() throws Exception { + public void requestQueueSize() { client.setOptions(ClientOptions.builder().requestQueueSize(10).build()); @@ -109,7 +115,73 @@ public void requestQueueSize() throws Exception { } @Test - public void disconnectedWithoutReconnect() throws Exception { + public void requestQueueSizeAppliedForReconnect() { + + client.setOptions(ClientOptions.builder().requestQueueSize(10).build()); + + RedisAsyncCommands connection = client.connect().async(); + ConnectionWatchdog watchdog = getConnectionWatchdog(connection.getStatefulConnection()); + + watchdog.setListenOnChannelInactive(false); + + connection.quit(); + + Wait.untilTrue(() -> !connection.getStatefulConnection().isOpen()).waitOrTimeout(); + + List> pings = new ArrayList<>(); + for (int i = 0; i < 10; i++) { + pings.add(connection.ping()); + } + + watchdog.setListenOnChannelInactive(true); + watchdog.scheduleReconnect(); + + for (RedisFuture ping : pings) { + assertThat(ping.toCompletableFuture().join()).isEqualTo("PONG"); + } + + connection.getStatefulConnection().close(); + } + + @Test + public void requestQueueSizeOvercommittedReconnect() throws Exception { + + client.setOptions(ClientOptions.builder().requestQueueSize(10).build()); + + StatefulRedisConnection connection = client.connect(); + ConnectionWatchdog watchdog = getConnectionWatchdog(connection); + + watchdog.setListenOnChannelInactive(false); + + Queue buffer = getStack(connection); + List> pings = new ArrayList<>(); + for (int i = 0; i < 11; i++) { + + AsyncCommand command = new AsyncCommand<>(new Command<>(CommandType.PING, + new StatusOutput<>(StringCodec.UTF8))); + pings.add(command); + buffer.add(command); + } + + getChannel(connection).disconnect(); + + Wait.untilTrue(() -> !connection.isOpen()).waitOrTimeout(); + + watchdog.setListenOnChannelInactive(true); + watchdog.scheduleReconnect(); + + for (int i = 0; i < 10; i++) { + assertThat(pings.get(i).get()).isEqualTo("PONG"); + } + + assertThatThrownBy(() -> pings.get(10).toCompletableFuture().join()).hasCauseInstanceOf(IllegalStateException.class) + .hasMessage("java.lang.IllegalStateException: Queue full"); + + connection.close(); + } + + @Test + public void disconnectedWithoutReconnect() { client.setOptions(ClientOptions.builder().autoReconnect(false).build()); @@ -127,7 +199,7 @@ public void disconnectedWithoutReconnect() throws Exception { } @Test - public void disconnectedRejectCommands() throws Exception { + public void disconnectedRejectCommands() { client.setOptions(ClientOptions.builder().disconnectedBehavior(ClientOptions.DisconnectedBehavior.REJECT_COMMANDS) .build()); @@ -147,7 +219,7 @@ public void disconnectedRejectCommands() throws Exception { } @Test - public void disconnectedAcceptCommands() throws Exception { + public void disconnectedAcceptCommands() { client.setOptions(ClientOptions.builder().autoReconnect(false) .disconnectedBehavior(ClientOptions.DisconnectedBehavior.ACCEPT_COMMANDS).build()); @@ -161,7 +233,7 @@ public void disconnectedAcceptCommands() throws Exception { } @Test(timeout = 10000) - public void pingBeforeConnect() throws Exception { + public void pingBeforeConnect() { redis.set(key, value); client.setOptions(ClientOptions.builder().pingBeforeActivateConnection(true).build()); @@ -196,11 +268,11 @@ public void pingBeforeConnectTimeout() throws Exception { } @Test - public void pingBeforeConnectWithAuthentication() throws Exception { + public void pingBeforeConnectWithAuthentication() { new WithPasswordRequired() { @Override - protected void run(RedisClient client) throws Exception { + protected void run(RedisClient client) { client.setOptions(ClientOptions.builder().pingBeforeActivateConnection(true).build()); RedisURI redisURI = RedisURI.Builder.redis(host, port).withPassword(passwd).build(); @@ -219,7 +291,7 @@ protected void run(RedisClient client) throws Exception { } @Test(timeout = 2000) - public void pingBeforeConnectWithAuthenticationTimeout() throws Exception { + public void pingBeforeConnectWithAuthenticationTimeout() { new WithPasswordRequired() { @Override @@ -245,11 +317,11 @@ protected void run(RedisClient client) throws Exception { } @Test - public void pingBeforeConnectWithSslAndAuthentication() throws Exception { + public void pingBeforeConnectWithSslAndAuthentication() { new WithPasswordRequired() { @Override - protected void run(RedisClient client) throws Exception { + protected void run(RedisClient client) { client.setOptions(ClientOptions.builder().pingBeforeActivateConnection(true).build()); RedisURI redisURI = RedisURI.Builder.redis(host, 6443).withPassword(passwd).withVerifyPeer(false).withSsl(true) @@ -269,11 +341,11 @@ protected void run(RedisClient client) throws Exception { } @Test - public void pingBeforeConnectWithAuthenticationFails() throws Exception { + public void pingBeforeConnectWithAuthenticationFails() { new WithPasswordRequired() { @Override - protected void run(RedisClient client) throws Exception { + protected void run(RedisClient client) { client.setOptions(ClientOptions.builder().pingBeforeActivateConnection(true).build()); RedisURI redisURI = RedisURI.builder().redis(host, port).build(); @@ -289,11 +361,11 @@ protected void run(RedisClient client) throws Exception { } @Test - public void pingBeforeConnectWithSslAndAuthenticationFails() throws Exception { + public void pingBeforeConnectWithSslAndAuthenticationFails() { new WithPasswordRequired() { @Override - protected void run(RedisClient client) throws Exception { + protected void run(RedisClient client) { client.setOptions(ClientOptions.builder().pingBeforeActivateConnection(true).build()); RedisURI redisURI = RedisURI.builder().redis(host, 6443).withVerifyPeer(false).withSsl(true).build(); @@ -351,7 +423,7 @@ public void pingBeforeConnectWithQueuedCommandsAndReconnect() throws Exception { } @Test(timeout = 10000) - public void authenticatedPingBeforeConnectWithQueuedCommandsAndReconnect() throws Exception { + public void authenticatedPingBeforeConnectWithQueuedCommandsAndReconnect() { new WithPasswordRequired() {