Skip to content

Commit

Permalink
Extend request command limits to accept connection activation commands
Browse files Browse the repository at this point in the history
…#691

Request queue size checks now consider an additional number of connection activation commands.
  • Loading branch information
mp911de committed Feb 16, 2018
1 parent acb6f99 commit 0bb8103
Show file tree
Hide file tree
Showing 3 changed files with 72 additions and 7 deletions.
14 changes: 10 additions & 4 deletions src/main/java/com/lambdaworks/redis/protocol/CommandHandler.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2011-2017 the original author or authors.
* Copyright 2011-2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -396,17 +396,19 @@ private void validateWrite() {

if (usesBoundedQueues()) {

boolean connected = isConnected();

if (QUEUE_SIZE.get(this) + 1 > clientOptions.getRequestQueueSize()) {
throw new RedisException("Request queue size exceeded: " + clientOptions.getRequestQueueSize()
+ ". Commands are not accepted until the queue size drops.");
}

if (disconnectedBuffer.size() + 1 > clientOptions.getRequestQueueSize()) {
if (!connected && disconnectedBuffer.size() + 1 > clientOptions.getRequestQueueSize()) {
throw new RedisException("Request queue size exceeded: " + clientOptions.getRequestQueueSize()
+ ". Commands are not accepted until the queue size drops.");
}

if (commandBuffer.size() + 1 > clientOptions.getRequestQueueSize()) {
if (connected && commandBuffer.size() + 1 > clientOptions.getRequestQueueSize()) {
throw new RedisException("Command buffer size exceeded: " + clientOptions.getRequestQueueSize()
+ ". Commands are not accepted until the queue size drops.");
}
Expand Down Expand Up @@ -759,7 +761,11 @@ private void validateWrite(int commands) {

if (usesBoundedQueues()) {

if (stack.size() + commands > clientOptions.getRequestQueueSize())
// number of maintenance commands (AUTH, CLIENT SETNAME, SELECT, READONLY) should be allowed on top
// of number of user commands to ensure the driver recovers properly from a disconnect
int maxMaintenanceCommands = 5;
int allowedRequestQueueSize = clientOptions.getRequestQueueSize() + maxMaintenanceCommands;
if (stack.size() + commands > allowedRequestQueueSize)
throw new RedisException("Internal stack size exceeded: " + clientOptions.getRequestQueueSize()
+ ". Commands are not accepted until the stack size drops.");
}
Expand Down
64 changes: 62 additions & 2 deletions src/test/java/com/lambdaworks/redis/ClientOptionsTest.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2011-2017 the original author or authors.
* Copyright 2011-2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -29,13 +29,13 @@
import java.util.Queue;
import java.util.concurrent.TimeUnit;

import com.lambdaworks.redis.codec.StringCodec;
import org.junit.Test;

import com.lambdaworks.Wait;
import com.lambdaworks.redis.api.StatefulRedisConnection;
import com.lambdaworks.redis.api.async.RedisAsyncCommands;
import com.lambdaworks.redis.api.sync.RedisCommands;
import com.lambdaworks.redis.codec.StringCodec;
import com.lambdaworks.redis.codec.Utf8StringCodec;
import com.lambdaworks.redis.output.StatusOutput;
import com.lambdaworks.redis.protocol.*;
Expand Down Expand Up @@ -119,6 +119,66 @@ public void requestQueueSizeAppliedForReconnect() {
client.setOptions(ClientOptions.builder().requestQueueSize(10).build());

RedisAsyncCommands<String, String> connection = client.connect().async();
testHitRequestQueueLimit(connection);
}

@Test
public void testHitRequestQueueLimitReconnectWithAuthCommand() {

new WithPasswordRequired() {

@Override
protected void run(RedisClient client) {

client.setOptions(ClientOptions.builder().requestQueueSize(10).build());

RedisAsyncCommands<String, String> connection = client.connect().async();
connection.auth(passwd);
testHitRequestQueueLimit(connection);
}
};
}

@Test
public void testHitRequestQueueLimitReconnectWithUriAuth() {

new WithPasswordRequired() {

@Override
protected void run(RedisClient client) {

client.setOptions(ClientOptions.builder().requestQueueSize(10).build());

RedisURI redisURI = RedisURI.create(host, port);
redisURI.setPassword(passwd);

RedisAsyncCommands<String, String> connection = client.connect(redisURI).async();
testHitRequestQueueLimit(connection);
}
};
}

@Test
public void testHitRequestQueueLimitReconnectWithUriAuthPingCommand() {

new WithPasswordRequired() {

@Override
protected void run(RedisClient client) {

client.setOptions(ClientOptions.builder().requestQueueSize(10).pingBeforeActivateConnection(true).build());

RedisURI redisURI = RedisURI.create(host, port);
redisURI.setPassword(passwd);

RedisAsyncCommands<String, String> connection = client.connect(redisURI).async();
testHitRequestQueueLimit(connection);
}
};
}

private void testHitRequestQueueLimit(RedisAsyncCommands<String, String> connection) {

ConnectionWatchdog watchdog = getConnectionWatchdog(connection.getStatefulConnection());

watchdog.setListenOnChannelInactive(false);
Expand Down
1 change: 0 additions & 1 deletion src/test/resources/log4j2-test.xml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
<Logger name="com.lambdaworks.redis" level="INFO"/>
<Logger name="com.lambdaworks.redis.cluster" level="INFO"/>
<Logger name="com.lambdaworks.redis.protocol" level="INFO"/>
<Logger name="io.netty" level="INFO"/>
<Root level="INFO">
<AppenderRef ref="Console"/>
<AppenderRef ref="File"/>
Expand Down

0 comments on commit 0bb8103

Please sign in to comment.