From e38c23428a5c394f1313804fc0c1772a67890134 Mon Sep 17 00:00:00 2001 From: Mark Paluch Date: Fri, 16 Feb 2018 13:11:48 +0100 Subject: [PATCH] Extend request command limits to accept connection activation commands #691 Request queue size checks now consider an additional number of connection activation commands. --- .../redis/protocol/CommandHandler.java | 12 +++- .../lambdaworks/redis/ClientOptionsTest.java | 60 +++++++++++++++++++ 2 files changed, 69 insertions(+), 3 deletions(-) diff --git a/src/main/java/com/lambdaworks/redis/protocol/CommandHandler.java b/src/main/java/com/lambdaworks/redis/protocol/CommandHandler.java index dcdaca1ad0..e1b8d46ffe 100644 --- a/src/main/java/com/lambdaworks/redis/protocol/CommandHandler.java +++ b/src/main/java/com/lambdaworks/redis/protocol/CommandHandler.java @@ -480,17 +480,19 @@ private void validateWrite() { if (usesBoundedQueues()) { + boolean connected = isConnected(); + if (QUEUE_SIZE.get(this) + 1 > clientOptions.getRequestQueueSize()) { throw new RedisException("Request queue size exceeded: " + clientOptions.getRequestQueueSize() + ". Commands are not accepted until the queue size drops."); } - if (disconnectedBuffer.size() + 1 > clientOptions.getRequestQueueSize()) { + if (!connected && disconnectedBuffer.size() + 1 > clientOptions.getRequestQueueSize()) { throw new RedisException("Request queue size exceeded: " + clientOptions.getRequestQueueSize() + ". Commands are not accepted until the queue size drops."); } - if (commandBuffer.size() + 1 > clientOptions.getRequestQueueSize()) { + if (connected && commandBuffer.size() + 1 > clientOptions.getRequestQueueSize()) { throw new RedisException("Command buffer size exceeded: " + clientOptions.getRequestQueueSize() + ". Commands are not accepted until the queue size drops."); } @@ -843,7 +845,11 @@ private void validateWrite(int commands) { if (usesBoundedQueues()) { - if (stack.size() + commands > clientOptions.getRequestQueueSize()) + // number of maintenance commands (AUTH, CLIENT SETNAME, SELECT, READONLY) should be allowed on top + // of number of user commands to ensure the driver recovers properly from a disconnect + int maxMaintenanceCommands = 5; + int allowedRequestQueueSize = clientOptions.getRequestQueueSize() + maxMaintenanceCommands; + if (stack.size() + commands > allowedRequestQueueSize) throw new RedisException("Internal stack size exceeded: " + clientOptions.getRequestQueueSize() + ". Commands are not accepted until the stack size drops."); } diff --git a/src/test/java/com/lambdaworks/redis/ClientOptionsTest.java b/src/test/java/com/lambdaworks/redis/ClientOptionsTest.java index 0a1cbee134..6c63d756e5 100644 --- a/src/test/java/com/lambdaworks/redis/ClientOptionsTest.java +++ b/src/test/java/com/lambdaworks/redis/ClientOptionsTest.java @@ -119,6 +119,66 @@ public void requestQueueSizeAppliedForReconnect() { client.setOptions(ClientOptions.builder().requestQueueSize(10).build()); RedisAsyncCommands connection = client.connect().async(); + testHitRequestQueueLimit(connection); + } + + @Test + public void testHitRequestQueueLimitReconnectWithAuthCommand() { + + new WithPasswordRequired() { + + @Override + protected void run(RedisClient client) { + + client.setOptions(ClientOptions.builder().requestQueueSize(10).build()); + + RedisAsyncCommands connection = client.connect().async(); + connection.auth(passwd); + testHitRequestQueueLimit(connection); + } + }; + } + + @Test + public void testHitRequestQueueLimitReconnectWithUriAuth() { + + new WithPasswordRequired() { + + @Override + protected void run(RedisClient client) { + + client.setOptions(ClientOptions.builder().requestQueueSize(10).build()); + + RedisURI redisURI = RedisURI.create(host, port); + redisURI.setPassword(passwd); + + RedisAsyncCommands connection = client.connect(redisURI).async(); + testHitRequestQueueLimit(connection); + } + }; + } + + @Test + public void testHitRequestQueueLimitReconnectWithUriAuthPingCommand() { + + new WithPasswordRequired() { + + @Override + protected void run(RedisClient client) { + + client.setOptions(ClientOptions.builder().requestQueueSize(10).pingBeforeActivateConnection(true).build()); + + RedisURI redisURI = RedisURI.create(host, port); + redisURI.setPassword(passwd); + + RedisAsyncCommands connection = client.connect(redisURI).async(); + testHitRequestQueueLimit(connection); + } + }; + } + + private void testHitRequestQueueLimit(RedisAsyncCommands connection) { + ConnectionWatchdog watchdog = getConnectionWatchdog(connection.getStatefulConnection()); watchdog.setListenOnChannelInactive(false);