diff --git a/src/main/java/com/lambdaworks/redis/protocol/CommandHandler.java b/src/main/java/com/lambdaworks/redis/protocol/CommandHandler.java index 505c9ad1f4..4b4fcbccba 100644 --- a/src/main/java/com/lambdaworks/redis/protocol/CommandHandler.java +++ b/src/main/java/com/lambdaworks/redis/protocol/CommandHandler.java @@ -1,5 +1,5 @@ /* - * Copyright 2011-2017 the original author or authors. + * Copyright 2011-2018 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -396,17 +396,19 @@ private void validateWrite() { if (usesBoundedQueues()) { + boolean connected = isConnected(); + if (QUEUE_SIZE.get(this) + 1 > clientOptions.getRequestQueueSize()) { throw new RedisException("Request queue size exceeded: " + clientOptions.getRequestQueueSize() + ". Commands are not accepted until the queue size drops."); } - if (disconnectedBuffer.size() + 1 > clientOptions.getRequestQueueSize()) { + if (!connected && disconnectedBuffer.size() + 1 > clientOptions.getRequestQueueSize()) { throw new RedisException("Request queue size exceeded: " + clientOptions.getRequestQueueSize() + ". Commands are not accepted until the queue size drops."); } - if (commandBuffer.size() + 1 > clientOptions.getRequestQueueSize()) { + if (connected && commandBuffer.size() + 1 > clientOptions.getRequestQueueSize()) { throw new RedisException("Command buffer size exceeded: " + clientOptions.getRequestQueueSize() + ". Commands are not accepted until the queue size drops."); } @@ -759,7 +761,11 @@ private void validateWrite(int commands) { if (usesBoundedQueues()) { - if (stack.size() + commands > clientOptions.getRequestQueueSize()) + // number of maintenance commands (AUTH, CLIENT SETNAME, SELECT, READONLY) should be allowed on top + // of number of user commands to ensure the driver recovers properly from a disconnect + int maxMaintenanceCommands = 5; + int allowedRequestQueueSize = clientOptions.getRequestQueueSize() + maxMaintenanceCommands; + if (stack.size() + commands > allowedRequestQueueSize) throw new RedisException("Internal stack size exceeded: " + clientOptions.getRequestQueueSize() + ". Commands are not accepted until the stack size drops."); } diff --git a/src/test/java/com/lambdaworks/redis/ClientOptionsTest.java b/src/test/java/com/lambdaworks/redis/ClientOptionsTest.java index 1ade663112..6c63d756e5 100644 --- a/src/test/java/com/lambdaworks/redis/ClientOptionsTest.java +++ b/src/test/java/com/lambdaworks/redis/ClientOptionsTest.java @@ -1,5 +1,5 @@ /* - * Copyright 2011-2017 the original author or authors. + * Copyright 2011-2018 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -29,13 +29,13 @@ import java.util.Queue; import java.util.concurrent.TimeUnit; -import com.lambdaworks.redis.codec.StringCodec; import org.junit.Test; import com.lambdaworks.Wait; import com.lambdaworks.redis.api.StatefulRedisConnection; import com.lambdaworks.redis.api.async.RedisAsyncCommands; import com.lambdaworks.redis.api.sync.RedisCommands; +import com.lambdaworks.redis.codec.StringCodec; import com.lambdaworks.redis.codec.Utf8StringCodec; import com.lambdaworks.redis.output.StatusOutput; import com.lambdaworks.redis.protocol.*; @@ -119,6 +119,66 @@ public void requestQueueSizeAppliedForReconnect() { client.setOptions(ClientOptions.builder().requestQueueSize(10).build()); RedisAsyncCommands connection = client.connect().async(); + testHitRequestQueueLimit(connection); + } + + @Test + public void testHitRequestQueueLimitReconnectWithAuthCommand() { + + new WithPasswordRequired() { + + @Override + protected void run(RedisClient client) { + + client.setOptions(ClientOptions.builder().requestQueueSize(10).build()); + + RedisAsyncCommands connection = client.connect().async(); + connection.auth(passwd); + testHitRequestQueueLimit(connection); + } + }; + } + + @Test + public void testHitRequestQueueLimitReconnectWithUriAuth() { + + new WithPasswordRequired() { + + @Override + protected void run(RedisClient client) { + + client.setOptions(ClientOptions.builder().requestQueueSize(10).build()); + + RedisURI redisURI = RedisURI.create(host, port); + redisURI.setPassword(passwd); + + RedisAsyncCommands connection = client.connect(redisURI).async(); + testHitRequestQueueLimit(connection); + } + }; + } + + @Test + public void testHitRequestQueueLimitReconnectWithUriAuthPingCommand() { + + new WithPasswordRequired() { + + @Override + protected void run(RedisClient client) { + + client.setOptions(ClientOptions.builder().requestQueueSize(10).pingBeforeActivateConnection(true).build()); + + RedisURI redisURI = RedisURI.create(host, port); + redisURI.setPassword(passwd); + + RedisAsyncCommands connection = client.connect(redisURI).async(); + testHitRequestQueueLimit(connection); + } + }; + } + + private void testHitRequestQueueLimit(RedisAsyncCommands connection) { + ConnectionWatchdog watchdog = getConnectionWatchdog(connection.getStatefulConnection()); watchdog.setListenOnChannelInactive(false); diff --git a/src/test/resources/log4j2-test.xml b/src/test/resources/log4j2-test.xml index 672acaaa90..8c8179c966 100644 --- a/src/test/resources/log4j2-test.xml +++ b/src/test/resources/log4j2-test.xml @@ -16,7 +16,6 @@ -