Skip to content

Commit

Permalink
Cancel overcommitted commands #616
Browse files Browse the repository at this point in the history
Lettuce now cancels overcommitted commands that exceed the queue size. Overcommitted commands can happen if commands were written to the protocol stack and a disconnect attempts to copy these commands to a smaller disconnected buffer.
  • Loading branch information
mp911de committed Oct 6, 2017
1 parent 6d8c9e9 commit 21056f6
Show file tree
Hide file tree
Showing 6 changed files with 158 additions and 67 deletions.
19 changes: 17 additions & 2 deletions src/main/java/com/lambdaworks/redis/protocol/CommandHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -924,9 +924,24 @@ private void rebuildQueue() {
drainCommands(stack, queuedCommands);
drainCommands(disconnectedBuffer, queuedCommands);

disconnectedBuffer.addAll(queuedCommands);
try {
disconnectedBuffer.addAll(queuedCommands);
} catch (RuntimeException e) {

if (debugEnabled) {
logger.debug("{} rebuildQueue Queue overcommit. Cannot add all commands to buffer (disconnected).",
logPrefix(), queuedCommands.size());
}
queuedCommands.removeAll(disconnectedBuffer);

logger.debug("{} rebuildQueue {} command(s) added to buffer", logPrefix(), disconnectedBuffer.size());
for (RedisCommand<?, ?, ?> command : queuedCommands) {
command.completeExceptionally(e);
}
}

if (debugEnabled) {
logger.debug("{} rebuildQueue {} command(s) added to buffer", logPrefix(), disconnectedBuffer.size());
}
}

private void activateCommandHandlerAndExecuteBufferedCommands(ChannelHandlerContext ctx) {
Expand Down
48 changes: 44 additions & 4 deletions src/test/java/com/lambdaworks/Connections.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2011-2016 the original author or authors.
* Copyright 2011-2017 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -15,11 +15,17 @@
*/
package com.lambdaworks;

import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Proxy;
import java.util.Queue;

import org.springframework.test.util.ReflectionTestUtils;

import com.lambdaworks.redis.RedisChannelHandler;
import com.lambdaworks.redis.RedisConnection;
import com.lambdaworks.redis.StatefulRedisConnectionImpl;
import com.lambdaworks.redis.api.StatefulConnection;
import com.lambdaworks.redis.api.StatefulRedisConnection;
import com.lambdaworks.redis.api.async.RedisAsyncCommands;
import com.lambdaworks.redis.protocol.ConnectionWatchdog;

Expand All @@ -30,11 +36,45 @@
*/
public class Connections {

public static <K, V> RedisChannelHandler<K, V> getRedisChannelHandler(RedisConnection<K, V> sync) {

InvocationHandler invocationHandler = Proxy.getInvocationHandler(sync);
return (RedisChannelHandler<K, V>) ReflectionTestUtils.getField(invocationHandler, "connection");
}

public static <T> T getHandler(Class<T> handlerType, RedisChannelHandler<?, ?> channelHandler) {
Channel channel = getChannel(channelHandler);
return (T) channel.pipeline().get((Class) handlerType);
}

public static Channel getChannel(RedisChannelHandler<?, ?> channelHandler) {
return (Channel) ReflectionTestUtils.getField(channelHandler.getChannelWriter(), "channel");
}

public static Queue<?> getDisconnectedBuffer(RedisChannelHandler<?, ?> channelHandler) {
return (Queue<?>) ReflectionTestUtils.getField(channelHandler.getChannelWriter(), "disconnectedBuffer");
}

public static Queue<?> getCommandBuffer(RedisChannelHandler<?, ?> channelHandler) {
return (Queue<?>) ReflectionTestUtils.getField(channelHandler.getChannelWriter(), "commandBuffer");
}

public static Queue<Object> getStack(StatefulRedisConnection<?, ?> connection) {
return getStack((RedisChannelHandler) connection);
}

public static Queue<Object> getStack(RedisChannelHandler<?, ?> channelHandler) {
return (Queue<Object>) ReflectionTestUtils.getField(channelHandler.getChannelWriter(), "stack");
}

public static String getConnectionState(RedisChannelHandler<?, ?> channelHandler) {
return ReflectionTestUtils.getField(channelHandler.getChannelWriter(), "lifecycleState").toString();
}

public static Channel getChannel(StatefulConnection<?, ?> connection) {
RedisChannelHandler<?, ?> channelHandler = (RedisChannelHandler<?, ?>) connection;

Channel channel = (Channel) ReflectionTestUtils.getField(channelHandler.getChannelWriter(), "channel");
return channel;
RedisChannelHandler<?, ?> channelHandler = (RedisChannelHandler<?, ?>) connection;
return (Channel) ReflectionTestUtils.getField(channelHandler.getChannelWriter(), "channel");
}

public static ConnectionWatchdog getConnectionWatchdog(StatefulConnection<?, ?> connection) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2011-2016 the original author or authors.
* Copyright 2011-2017 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -82,14 +82,16 @@ public void closeConnection() throws Exception {
public abstract class WithPasswordRequired {
protected abstract void run(RedisClient client) throws Exception;

public WithPasswordRequired() throws Exception {
public WithPasswordRequired() {
try {
redis.configSet("requirepass", passwd);
redis.auth(passwd);

RedisClient client = newRedisClient();
try {
run(client);
} catch (Exception e) {
throw new IllegalStateException(e);
} finally {
FastShutdown.shutdown(client);
}
Expand Down
110 changes: 91 additions & 19 deletions src/test/java/com/lambdaworks/redis/ClientOptionsTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,19 @@

import static com.lambdaworks.Connections.getChannel;
import static com.lambdaworks.Connections.getConnectionWatchdog;
import static com.lambdaworks.Connections.getStack;
import static com.lambdaworks.Connections.getStatefulConnection;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.assertj.core.api.Assertions.fail;

import java.net.ServerSocket;
import java.util.ArrayList;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.TimeUnit;

import com.lambdaworks.redis.codec.StringCodec;
import org.junit.Test;

import com.lambdaworks.Wait;
Expand All @@ -42,17 +48,17 @@
public class ClientOptionsTest extends AbstractRedisClientTest {

@Test
public void testNew() throws Exception {
public void testNew() {
checkAssertions(ClientOptions.create());
}

@Test
public void testBuilder() throws Exception {
public void testBuilder() {
checkAssertions(ClientOptions.builder().build());
}

@Test
public void testCopy() throws Exception {
public void testCopy() {
checkAssertions(ClientOptions.copyOf(ClientOptions.builder().build()));
}

Expand All @@ -65,7 +71,7 @@ protected void checkAssertions(ClientOptions sut) {
}

@Test
public void variousClientOptions() throws Exception {
public void variousClientOptions() {

RedisAsyncCommands<String, String> plain = client.connect().async();

Expand All @@ -82,7 +88,7 @@ public void variousClientOptions() throws Exception {
}

@Test
public void requestQueueSize() throws Exception {
public void requestQueueSize() {

client.setOptions(ClientOptions.builder().requestQueueSize(10).build());

Expand All @@ -108,7 +114,73 @@ public void requestQueueSize() throws Exception {
}

@Test
public void disconnectedWithoutReconnect() throws Exception {
public void requestQueueSizeAppliedForReconnect() {

client.setOptions(ClientOptions.builder().requestQueueSize(10).build());

RedisAsyncCommands<String, String> connection = client.connect().async();
ConnectionWatchdog watchdog = getConnectionWatchdog(connection.getStatefulConnection());

watchdog.setListenOnChannelInactive(false);

connection.quit();

Wait.untilTrue(() -> !connection.getStatefulConnection().isOpen()).waitOrTimeout();

List<RedisFuture<String>> pings = new ArrayList<>();
for (int i = 0; i < 10; i++) {
pings.add(connection.ping());
}

watchdog.setListenOnChannelInactive(true);
watchdog.scheduleReconnect();

for (RedisFuture<String> ping : pings) {
assertThat(ping.toCompletableFuture().join()).isEqualTo("PONG");
}

connection.getStatefulConnection().close();
}

@Test
public void requestQueueSizeOvercommittedReconnect() throws Exception {

client.setOptions(ClientOptions.builder().requestQueueSize(10).build());

StatefulRedisConnection<String, String> connection = client.connect();
ConnectionWatchdog watchdog = getConnectionWatchdog(connection);

watchdog.setListenOnChannelInactive(false);

Queue<Object> buffer = getStack(connection);
List<RedisFuture<String>> pings = new ArrayList<>();
for (int i = 0; i < 11; i++) {

AsyncCommand<String, String, String> command = new AsyncCommand<>(new Command<>(CommandType.PING,
new StatusOutput<>(StringCodec.UTF8)));
pings.add(command);
buffer.add(command);
}

getChannel(connection).disconnect();

Wait.untilTrue(() -> !connection.isOpen()).waitOrTimeout();

watchdog.setListenOnChannelInactive(true);
watchdog.scheduleReconnect();

for (int i = 0; i < 10; i++) {
assertThat(pings.get(i).get()).isEqualTo("PONG");
}

assertThatThrownBy(() -> pings.get(10).toCompletableFuture().join()).hasCauseInstanceOf(IllegalStateException.class)
.hasMessage("java.lang.IllegalStateException: Queue full");

connection.close();
}

@Test
public void disconnectedWithoutReconnect() {

client.setOptions(ClientOptions.builder().autoReconnect(false).build());

Expand All @@ -126,7 +198,7 @@ public void disconnectedWithoutReconnect() throws Exception {
}

@Test
public void disconnectedRejectCommands() throws Exception {
public void disconnectedRejectCommands() {

client.setOptions(ClientOptions.builder().disconnectedBehavior(ClientOptions.DisconnectedBehavior.REJECT_COMMANDS)
.build());
Expand All @@ -146,7 +218,7 @@ public void disconnectedRejectCommands() throws Exception {
}

@Test
public void disconnectedAcceptCommands() throws Exception {
public void disconnectedAcceptCommands() {

client.setOptions(ClientOptions.builder().autoReconnect(false)
.disconnectedBehavior(ClientOptions.DisconnectedBehavior.ACCEPT_COMMANDS).build());
Expand All @@ -160,7 +232,7 @@ public void disconnectedAcceptCommands() throws Exception {
}

@Test(timeout = 10000)
public void pingBeforeConnect() throws Exception {
public void pingBeforeConnect() {

redis.set(key, value);
client.setOptions(ClientOptions.builder().pingBeforeActivateConnection(true).build());
Expand Down Expand Up @@ -195,11 +267,11 @@ public void pingBeforeConnectTimeout() throws Exception {
}

@Test
public void pingBeforeConnectWithAuthentication() throws Exception {
public void pingBeforeConnectWithAuthentication() {

new WithPasswordRequired() {
@Override
protected void run(RedisClient client) throws Exception {
protected void run(RedisClient client) {

client.setOptions(ClientOptions.builder().pingBeforeActivateConnection(true).build());
RedisURI redisURI = RedisURI.Builder.redis(host, port).withPassword(passwd).build();
Expand All @@ -218,7 +290,7 @@ protected void run(RedisClient client) throws Exception {
}

@Test(timeout = 2000)
public void pingBeforeConnectWithAuthenticationTimeout() throws Exception {
public void pingBeforeConnectWithAuthenticationTimeout() {

new WithPasswordRequired() {
@Override
Expand All @@ -244,11 +316,11 @@ protected void run(RedisClient client) throws Exception {
}

@Test
public void pingBeforeConnectWithSslAndAuthentication() throws Exception {
public void pingBeforeConnectWithSslAndAuthentication() {

new WithPasswordRequired() {
@Override
protected void run(RedisClient client) throws Exception {
protected void run(RedisClient client) {

client.setOptions(ClientOptions.builder().pingBeforeActivateConnection(true).build());
RedisURI redisURI = RedisURI.Builder.redis(host, 6443).withPassword(passwd).withVerifyPeer(false).withSsl(true)
Expand All @@ -268,11 +340,11 @@ protected void run(RedisClient client) throws Exception {
}

@Test
public void pingBeforeConnectWithAuthenticationFails() throws Exception {
public void pingBeforeConnectWithAuthenticationFails() {

new WithPasswordRequired() {
@Override
protected void run(RedisClient client) throws Exception {
protected void run(RedisClient client) {

client.setOptions(ClientOptions.builder().pingBeforeActivateConnection(true).build());
RedisURI redisURI = RedisURI.builder().redis(host, port).build();
Expand All @@ -288,11 +360,11 @@ protected void run(RedisClient client) throws Exception {
}

@Test
public void pingBeforeConnectWithSslAndAuthenticationFails() throws Exception {
public void pingBeforeConnectWithSslAndAuthenticationFails() {

new WithPasswordRequired() {
@Override
protected void run(RedisClient client) throws Exception {
protected void run(RedisClient client) {

client.setOptions(ClientOptions.builder().pingBeforeActivateConnection(true).build());
RedisURI redisURI = RedisURI.builder().redis(host, 6443).withVerifyPeer(false).withSsl(true).build();
Expand Down Expand Up @@ -351,7 +423,7 @@ public void pingBeforeConnectWithQueuedCommandsAndReconnect() throws Exception {
}

@Test(timeout = 10000)
public void authenticatedPingBeforeConnectWithQueuedCommandsAndReconnect() throws Exception {
public void authenticatedPingBeforeConnectWithQueuedCommandsAndReconnect() {

new WithPasswordRequired() {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,6 @@ public Object answer(InvocationOnMock invocation) throws Throwable {
return null;
}
}).when(channelHandler).activated();
when(channel.isActive()).thenReturn(true);

sut.channelRegistered(context);
sut.channelActive(context);
Expand Down Expand Up @@ -292,7 +291,6 @@ public void testIOExceptionChannelActive() throws Exception {
@Test
public void testWriteChannelDisconnected() throws Exception {

when(channel.isActive()).thenReturn(true);
sut.channelRegistered(context);
sut.channelActive(context);

Expand All @@ -310,7 +308,6 @@ public void testWriteChannelDisconnectedWithoutReconnect() throws Exception {
sut = new CommandHandler<>(ClientOptions.builder().autoReconnect(false).build(), clientResources);
sut.setRedisChannelHandler(channelHandler);

when(channel.isActive()).thenReturn(true);
sut.channelRegistered(context);
sut.channelActive(context);

Expand Down
Loading

0 comments on commit 21056f6

Please sign in to comment.