diff --git a/src/main/java/com/lambdaworks/redis/protocol/ChannelLogDescriptor.java b/src/main/java/com/lambdaworks/redis/protocol/ChannelLogDescriptor.java index 1d73915c31..b1a1e334cc 100644 --- a/src/main/java/com/lambdaworks/redis/protocol/ChannelLogDescriptor.java +++ b/src/main/java/com/lambdaworks/redis/protocol/ChannelLogDescriptor.java @@ -23,7 +23,11 @@ static String logDescriptor(Channel channel) { } if (!channel.isActive()) { - buffer.append(" (inactive)"); + if(buffer.length() != 0){ + buffer.append(' '); + } + + buffer.append("(inactive)"); } return buffer.toString(); diff --git a/src/main/java/com/lambdaworks/redis/protocol/CommandHandler.java b/src/main/java/com/lambdaworks/redis/protocol/CommandHandler.java index 10fc41b0fb..39262c6640 100644 --- a/src/main/java/com/lambdaworks/redis/protocol/CommandHandler.java +++ b/src/main/java/com/lambdaworks/redis/protocol/CommandHandler.java @@ -9,6 +9,7 @@ import java.nio.channels.ClosedChannelException; import java.nio.charset.Charset; import java.util.*; +import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.ReentrantLock; import com.google.common.collect.ImmutableList; @@ -22,6 +23,7 @@ import com.lambdaworks.redis.resource.ClientResources; import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufAllocator; import io.netty.channel.Channel; import io.netty.channel.ChannelDuplexHandler; import io.netty.channel.ChannelFuture; @@ -48,6 +50,7 @@ public class CommandHandler extends ChannelDuplexHandler implements RedisC private static final InternalLogger logger = InternalLoggerFactory.getInstance(CommandHandler.class); private static final WriteLogListener WRITE_LOG_LISTENER = new WriteLogListener(); + private static final AtomicLong CHANNEL_COUNTER = new AtomicLong(); /** * When we encounter an unexpected IOException we look for these {@link Throwable#getMessage() messages} (because we have no @@ -57,6 +60,7 @@ public class CommandHandler extends ChannelDuplexHandler implements RedisC private static final Set SUPPRESS_IO_EXCEPTION_MESSAGES = ImmutableSet.of("Connection reset by peer", "Broken pipe", "Connection timed out"); + protected final long commandHandlerId = CHANNEL_COUNTER.incrementAndGet(); protected final ClientOptions clientOptions; protected final ClientResources clientResources; protected final Queue> queue; @@ -65,9 +69,9 @@ public class CommandHandler extends ChannelDuplexHandler implements RedisC // all access to the commandBuffer is synchronized protected volatile Deque> commandBuffer = newCommandBuffer(); protected final Deque> transportBuffer = newCommandBuffer(); - protected ByteBuf buffer; - protected RedisStateMachine rsm; - protected Channel channel; + protected final ByteBuf buffer = ByteBufAllocator.DEFAULT.directBuffer(8192 * 8); + protected final RedisStateMachine rsm = new RedisStateMachine(); + protected volatile Channel channel; private volatile ConnectionWatchdog connectionWatchdog; // If TRACE level logging has been enabled at startup. @@ -111,17 +115,36 @@ public CommandHandler(ClientOptions clientOptions, ClientResources clientResourc */ @Override public void channelRegistered(ChannelHandlerContext ctx) throws Exception { - setState(LifecycleState.REGISTERED); - buffer = ctx.alloc().directBuffer(8192 * 8); - rsm = new RedisStateMachine(); + synchronized (stateLock) { channel = ctx.channel(); } + + if (debugEnabled) { + logPrefix = null; + logger.debug("{} channelRegistered()", logPrefix()); + } + + setState(LifecycleState.REGISTERED); + + if(buffer.refCnt() > 0) { + buffer.clear(); + } + ctx.fireChannelRegistered(); } @Override public void channelUnregistered(ChannelHandlerContext ctx) throws Exception { - releaseBuffer(); + + if (debugEnabled) { + logger.debug("{} channelUnregistered()", logPrefix()); + } + + if (channel != null && ctx.channel() != channel) { + logger.debug("{} My channel and ctx.channel mismatch. Propagating event to other listeners", logPrefix()); + ctx.fireChannelUnregistered(); + return; + } if (lifecycleState == LifecycleState.CLOSED) { cancelCommands("Connection closed"); @@ -129,6 +152,8 @@ public void channelUnregistered(ChannelHandlerContext ctx) throws Exception { synchronized (stateLock) { channel = null; } + + ctx.fireChannelUnregistered(); } /** @@ -139,15 +164,30 @@ public void channelUnregistered(ChannelHandlerContext ctx) throws Exception { public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { ByteBuf input = (ByteBuf) msg; - if (!input.isReadable() || input.refCnt() == 0 || buffer == null) { + if (!input.isReadable() || input.refCnt() == 0) { + logger.warn("{} Input not readable {}, {}", logPrefix(), input.isReadable(), input.refCnt()); return; } + if(debugEnabled) { + logger.debug("{} Received: {} bytes, {} queued commands", logPrefix(), input.readableBytes(), queue.size()); + } + try { + if (buffer.refCnt() < 1) { + logger.warn("{} Ignoring received data for closed or abandoned connection", logPrefix()); + return; + } + + if (debugEnabled && ctx.channel() != channel) { + logger.debug("{} Ignoring data for a non-registered channel {}", logPrefix(), ctx.channel()); + return; + } + buffer.writeBytes(input); if (traceEnabled) { - logger.trace("{} Received: {}", logPrefix(), buffer.toString(Charset.defaultCharset()).trim()); + logger.trace("{} Buffer: {}", logPrefix(), input.toString(Charset.defaultCharset()).trim()); } decode(ctx, buffer); @@ -181,7 +221,7 @@ protected void decode(ChannelHandlerContext ctx, ByteBuf buffer) throws Interrup command.complete(); - if (buffer != null && buffer.refCnt() != 0) { + if (buffer.refCnt() != 0) { buffer.discardReadBytes(); } } @@ -564,6 +604,12 @@ public void channelInactive(ChannelHandlerContext ctx) throws Exception { logger.debug("{} channelInactive()", logPrefix()); } + if (channel != null && ctx.channel() != channel) { + logger.debug("{} My channel and ctx.channel mismatch. Propagating event to other listeners.", logPrefix()); + super.channelInactive(ctx); + return; + } + try { writeLock.lock(); setStateIfNotClosed(LifecycleState.DISCONNECTED); @@ -589,10 +635,7 @@ public void channelInactive(ChannelHandlerContext ctx) throws Exception { writeLock.unlock(); } - if (buffer != null) { - rsm.reset(); - buffer.clear(); - } + rsm.reset(); if (debugEnabled) { logger.debug("{} channelInactive() done", logPrefix()); @@ -708,12 +751,9 @@ public void close() { } else if (connectionWatchdog != null) { connectionWatchdog.prepareClose(new ConnectionEvents.PrepareClose()); } - } - private void releaseBuffer() { - if (buffer != null) { + if (buffer.refCnt() > 0) { buffer.release(); - buffer = null; } } @@ -737,8 +777,9 @@ public void reset() { writeLock.unlock(); } - if (buffer != null) { - rsm.reset(); + rsm.reset(); + + if (buffer.refCnt() > 0) { buffer.clear(); } } @@ -777,7 +818,8 @@ protected String logPrefix() { return logPrefix; } StringBuffer buffer = new StringBuffer(64); - buffer.append('[').append(ChannelLogDescriptor.logDescriptor(channel)).append(']'); + buffer.append('[').append("chid=0x").append(Long.toHexString(commandHandlerId)).append(", ") + .append(ChannelLogDescriptor.logDescriptor(channel)).append(']'); return logPrefix = buffer.toString(); }