Skip to content

Commit

Permalink
Extend request command limits to accept connection activation commands
Browse files Browse the repository at this point in the history
…#691

Request queue size checks now consider an additional number of connection activation commands.
  • Loading branch information
mp911de committed Feb 16, 2018
1 parent 07ed1b8 commit e38c234
Show file tree
Hide file tree
Showing 2 changed files with 69 additions and 3 deletions.
12 changes: 9 additions & 3 deletions src/main/java/com/lambdaworks/redis/protocol/CommandHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -480,17 +480,19 @@ private void validateWrite() {

if (usesBoundedQueues()) {

boolean connected = isConnected();

if (QUEUE_SIZE.get(this) + 1 > clientOptions.getRequestQueueSize()) {
throw new RedisException("Request queue size exceeded: " + clientOptions.getRequestQueueSize()
+ ". Commands are not accepted until the queue size drops.");
}

if (disconnectedBuffer.size() + 1 > clientOptions.getRequestQueueSize()) {
if (!connected && disconnectedBuffer.size() + 1 > clientOptions.getRequestQueueSize()) {
throw new RedisException("Request queue size exceeded: " + clientOptions.getRequestQueueSize()
+ ". Commands are not accepted until the queue size drops.");
}

if (commandBuffer.size() + 1 > clientOptions.getRequestQueueSize()) {
if (connected && commandBuffer.size() + 1 > clientOptions.getRequestQueueSize()) {
throw new RedisException("Command buffer size exceeded: " + clientOptions.getRequestQueueSize()
+ ". Commands are not accepted until the queue size drops.");
}
Expand Down Expand Up @@ -843,7 +845,11 @@ private void validateWrite(int commands) {

if (usesBoundedQueues()) {

if (stack.size() + commands > clientOptions.getRequestQueueSize())
// number of maintenance commands (AUTH, CLIENT SETNAME, SELECT, READONLY) should be allowed on top
// of number of user commands to ensure the driver recovers properly from a disconnect
int maxMaintenanceCommands = 5;
int allowedRequestQueueSize = clientOptions.getRequestQueueSize() + maxMaintenanceCommands;
if (stack.size() + commands > allowedRequestQueueSize)
throw new RedisException("Internal stack size exceeded: " + clientOptions.getRequestQueueSize()
+ ". Commands are not accepted until the stack size drops.");
}
Expand Down
60 changes: 60 additions & 0 deletions src/test/java/com/lambdaworks/redis/ClientOptionsTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,66 @@ public void requestQueueSizeAppliedForReconnect() {
client.setOptions(ClientOptions.builder().requestQueueSize(10).build());

RedisAsyncCommands<String, String> connection = client.connect().async();
testHitRequestQueueLimit(connection);
}

@Test
public void testHitRequestQueueLimitReconnectWithAuthCommand() {

new WithPasswordRequired() {

@Override
protected void run(RedisClient client) {

client.setOptions(ClientOptions.builder().requestQueueSize(10).build());

RedisAsyncCommands<String, String> connection = client.connect().async();
connection.auth(passwd);
testHitRequestQueueLimit(connection);
}
};
}

@Test
public void testHitRequestQueueLimitReconnectWithUriAuth() {

new WithPasswordRequired() {

@Override
protected void run(RedisClient client) {

client.setOptions(ClientOptions.builder().requestQueueSize(10).build());

RedisURI redisURI = RedisURI.create(host, port);
redisURI.setPassword(passwd);

RedisAsyncCommands<String, String> connection = client.connect(redisURI).async();
testHitRequestQueueLimit(connection);
}
};
}

@Test
public void testHitRequestQueueLimitReconnectWithUriAuthPingCommand() {

new WithPasswordRequired() {

@Override
protected void run(RedisClient client) {

client.setOptions(ClientOptions.builder().requestQueueSize(10).pingBeforeActivateConnection(true).build());

RedisURI redisURI = RedisURI.create(host, port);
redisURI.setPassword(passwd);

RedisAsyncCommands<String, String> connection = client.connect(redisURI).async();
testHitRequestQueueLimit(connection);
}
};
}

private void testHitRequestQueueLimit(RedisAsyncCommands<String, String> connection) {

ConnectionWatchdog watchdog = getConnectionWatchdog(connection.getStatefulConnection());

watchdog.setListenOnChannelInactive(false);
Expand Down

0 comments on commit e38c234

Please sign in to comment.