Skip to content

Commit

Permalink
Adjust logging when running into Exceptions #140
Browse files Browse the repository at this point in the history
- Log consumed exceptions at DEBUG level
consumed means either the exception was attached to a command or it was stored in the connection error field when disconnected
- Log timeout/connection reset/broken pipe IOExceptions on DEBUG level and log all other IOExceptions on INFO level
- Keep: Everything else is logged on WARN level
- Adopt log pattern/log behavior to write log listener
  • Loading branch information
mp911de committed Oct 3, 2015
1 parent cc656ad commit 8c6e907
Show file tree
Hide file tree
Showing 2 changed files with 49 additions and 11 deletions.
46 changes: 38 additions & 8 deletions src/main/java/com/lambdaworks/redis/protocol/CommandHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

import static com.google.common.base.Preconditions.checkArgument;

import java.io.IOException;
import java.net.SocketAddress;
import java.nio.channels.ClosedChannelException;
import java.nio.charset.Charset;
Expand All @@ -13,9 +14,11 @@
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.locks.ReentrantLock;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.MapMaker;
import com.lambdaworks.redis.ClientOptions;
import com.lambdaworks.redis.ConnectionEvents;
Expand All @@ -35,6 +38,7 @@
import io.netty.channel.local.LocalAddress;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GenericFutureListener;
import io.netty.util.internal.logging.InternalLogLevel;
import io.netty.util.internal.logging.InternalLogger;
import io.netty.util.internal.logging.InternalLoggerFactory;

Expand All @@ -51,6 +55,14 @@ public class CommandHandler<K, V> extends ChannelDuplexHandler implements RedisC
private static final InternalLogger logger = InternalLoggerFactory.getInstance(CommandHandler.class);
private static final WriteLogListener WRITE_LOG_LISTENER = new WriteLogListener();

/**
* When we encounter an unexpected IOException we look for these {@link Throwable#getMessage() messages} (because we have no
* better way to distinguish) and log them at DEBUG rather than WARN, since they are generally caused by unclean client
* disconnects rather than an actual problem.
*/
private static final Set<String> SUPPRESS_IO_EXCEPTION_MESSAGES = ImmutableSet.of("Connection reset by peer",
"Broken pipe", "Connection timed out");

protected final ClientOptions clientOptions;
protected final ClientResources clientResources;
protected final Queue<RedisCommand<K, V, ?>> queue;
Expand Down Expand Up @@ -514,17 +526,16 @@ private void cancelCommands(String message) {

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
if (debugEnabled) {
logger.debug("{} exceptionCaught()", logPrefix(), cause);
logger.debug(cause.getMessage(), cause);
}

InternalLogLevel logLevel = InternalLogLevel.WARN;

if (!queue.isEmpty()) {
RedisCommand<K, V, ?> command = queue.poll();
sentTimes.remove(command);
if (debugEnabled) {
logger.debug("{} Storing exception in {}", logPrefix(), command);
}
logLevel = InternalLogLevel.DEBUG;
command.setException(cause);
command.complete();
}
Expand All @@ -533,10 +544,18 @@ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws E
if (debugEnabled) {
logger.debug("{} Storing exception in connectionError", logPrefix());
}
logLevel = InternalLogLevel.DEBUG;
connectionError = cause;
return;
}
super.exceptionCaught(ctx, cause);

if (cause instanceof IOException && logLevel.ordinal() > InternalLogLevel.INFO.ordinal()) {
logLevel = InternalLogLevel.INFO;
if (SUPPRESS_IO_EXCEPTION_MESSAGES.contains(cause.getMessage())) {
logLevel = InternalLogLevel.DEBUG;
}
}

logger.log(logLevel, "{} Unexpected exception during request: {}", logPrefix, cause.toString(), cause);
}

/**
Expand Down Expand Up @@ -675,10 +694,21 @@ public void operationComplete(ChannelFuture future) throws Exception {
*
*/
static class WriteLogListener implements GenericFutureListener<Future<Void>> {

@Override
public void operationComplete(Future<Void> future) throws Exception {
if (!future.isSuccess() && !(future.cause() instanceof ClosedChannelException))
logger.warn(future.cause().getMessage(), future.cause());
Throwable cause = future.cause();
if (!future.isSuccess() && !(cause instanceof ClosedChannelException)) {

String message = "Unexpected exception during request: {}";
InternalLogLevel logLevel = InternalLogLevel.WARN;

if (cause instanceof IOException && SUPPRESS_IO_EXCEPTION_MESSAGES.contains(cause.getMessage())) {
logLevel = InternalLogLevel.DEBUG;
}

logger.log(logLevel, message, cause.toString(), cause);
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.*;

import java.io.IOException;
import java.util.ArrayDeque;
import java.util.Queue;
import java.util.concurrent.Future;
Expand Down Expand Up @@ -104,7 +105,16 @@ public void testExceptionChannelActive() throws Exception {

sut.channelActive(context);
sut.exceptionCaught(context, new Exception());
verify(context).fireExceptionCaught(any(Exception.class));
}

@Test
public void testIOExceptionChannelActive() throws Exception {
sut.setState(CommandHandler.LifecycleState.ACTIVE);

when(channel.isActive()).thenReturn(true);

sut.channelActive(context);
sut.exceptionCaught(context, new IOException("Connection timed out"));
}

@Test
Expand All @@ -127,8 +137,6 @@ public void testExceptionWithQueue() throws Exception {

assertThat(q).isEmpty();
assertThat(command.getException()).isNotNull();

verify(context).fireExceptionCaught(any(Exception.class));
}

@Test(expected = RedisException.class)
Expand Down

0 comments on commit 8c6e907

Please sign in to comment.