Skip to content

Commit

Permalink
Extend request command limits to accept connection activation commands
Browse files Browse the repository at this point in the history
…#691

Request queue size checks now consider an additional number of connection activation commands.
  • Loading branch information
mp911de committed Feb 16, 2018
1 parent 014fca1 commit dbfa192
Show file tree
Hide file tree
Showing 3 changed files with 70 additions and 3 deletions.
7 changes: 6 additions & 1 deletion src/main/java/io/lettuce/core/protocol/CommandHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -436,7 +436,12 @@ private void validateWrite(int commands) {

if (usesBoundedQueues()) {

if (stack.size() + commands > clientOptions.getRequestQueueSize())
// number of maintenance commands (AUTH, CLIENT SETNAME, SELECT, READONLY) should be allowed on top
// of number of user commands to ensure the driver recovers properly from a disconnect
int maxMaintenanceCommands = 5;
int allowedRequestQueueSize = clientOptions.getRequestQueueSize() + maxMaintenanceCommands;
if (stack.size() + commands > allowedRequestQueueSize)

throw new RedisException("Internal stack size exceeded: " + clientOptions.getRequestQueueSize()
+ ". Commands are not accepted until the stack size drops.");
}
Expand Down
6 changes: 4 additions & 2 deletions src/main/java/io/lettuce/core/protocol/DefaultEndpoint.java
Original file line number Diff line number Diff line change
Expand Up @@ -183,17 +183,19 @@ private void validateWrite(int commands) {

if (usesBoundedQueues()) {

boolean connected = isConnected();

if (QUEUE_SIZE.get(this) + commands > clientOptions.getRequestQueueSize()) {
throw new RedisException("Request queue size exceeded: " + clientOptions.getRequestQueueSize()
+ ". Commands are not accepted until the queue size drops.");
}

if (disconnectedBuffer.size() + commands > clientOptions.getRequestQueueSize()) {
if (!connected && disconnectedBuffer.size() + commands > clientOptions.getRequestQueueSize()) {
throw new RedisException("Request queue size exceeded: " + clientOptions.getRequestQueueSize()
+ ". Commands are not accepted until the queue size drops.");
}

if (commandBuffer.size() + commands > clientOptions.getRequestQueueSize()) {
if (connected && commandBuffer.size() + commands > clientOptions.getRequestQueueSize()) {
throw new RedisException("Command buffer size exceeded: " + clientOptions.getRequestQueueSize()
+ ". Commands are not accepted until the queue size drops.");
}
Expand Down
60 changes: 60 additions & 0 deletions src/test/java/io/lettuce/core/ClientOptionsTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,66 @@ public void requestQueueSizeAppliedForReconnect() {
client.setOptions(ClientOptions.builder().requestQueueSize(10).build());

RedisAsyncCommands<String, String> connection = client.connect().async();
testHitRequestQueueLimit(connection);
}

@Test
public void testHitRequestQueueLimitReconnectWithAuthCommand() {

new WithPasswordRequired() {

@Override
protected void run(RedisClient client) {

client.setOptions(ClientOptions.builder().requestQueueSize(10).build());

RedisAsyncCommands<String, String> connection = client.connect().async();
connection.auth(passwd);
testHitRequestQueueLimit(connection);
}
};
}

@Test
public void testHitRequestQueueLimitReconnectWithUriAuth() {

new WithPasswordRequired() {

@Override
protected void run(RedisClient client) {

client.setOptions(ClientOptions.builder().requestQueueSize(10).build());

RedisURI redisURI = RedisURI.create(host, port);
redisURI.setPassword(passwd);

RedisAsyncCommands<String, String> connection = client.connect(redisURI).async();
testHitRequestQueueLimit(connection);
}
};
}

@Test
public void testHitRequestQueueLimitReconnectWithUriAuthPingCommand() {

new WithPasswordRequired() {

@Override
protected void run(RedisClient client) {

client.setOptions(ClientOptions.builder().requestQueueSize(10).pingBeforeActivateConnection(true).build());

RedisURI redisURI = RedisURI.create(host, port);
redisURI.setPassword(passwd);

RedisAsyncCommands<String, String> connection = client.connect(redisURI).async();
testHitRequestQueueLimit(connection);
}
};
}

private void testHitRequestQueueLimit(RedisAsyncCommands<String, String> connection) {

ConnectionWatchdog watchdog = getConnectionWatchdog(connection.getStatefulConnection());

watchdog.setListenOnChannelInactive(false);
Expand Down

0 comments on commit dbfa192

Please sign in to comment.