Skip to content

Commit

Permalink
Added locking to prevent race conditions
Browse files Browse the repository at this point in the history
  • Loading branch information
mp911de committed Jun 21, 2014
1 parent 807f391 commit ddce2ff
Show file tree
Hide file tree
Showing 2 changed files with 54 additions and 44 deletions.
4 changes: 2 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -231,8 +231,8 @@ start: cleanup
echo "$$REDIS1_CONF" > work/redis1-6479.conf && redis-server work/redis1-6479.conf
echo "$$REDIS2_CONF" > work/redis2-6480.conf && redis-server work/redis2-6480.conf
echo "$$REDIS3_CONF" > work/redis3-6481.conf && redis-server work/redis3-6481.conf
echo "$$REDIS4_CONF" > work/redis3-6482.conf && redis-server work/redis3-6482.conf
echo "$$REDIS5_CONF" > work/redis2-6483.conf && redis-server work/redis2-6483.conf
echo "$$REDIS4_CONF" > work/redis3-6482.conf && redis-server work/redis3-6482.conf
echo "$$REDIS5_CONF" > work/redis2-6483.conf && redis-server work/redis2-6483.conf
echo "$$REDIS6_CONF" > work/redis2-6484.conf && redis-server work/redis2-6484.conf
echo "$$REDIS7_CONF" > work/redis2-6485.conf && redis-server work/redis2-6485.conf
echo "$$REDIS_SENTINEL1" > work/sentinel1-26379.conf && redis-server work/sentinel1-26379.conf --sentinel
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReentrantLock;

import com.lambdaworks.redis.RedisChannelHandler;
Expand Down Expand Up @@ -39,7 +40,7 @@ public class CommandHandler<K, V> extends ChannelDuplexHandler implements RedisC
protected BlockingQueue<RedisCommand<K, V, ?>> commandBuffer = new LinkedBlockingQueue<RedisCommand<K, V, ?>>();
protected ByteBuf buffer;
protected RedisStateMachine<K, V> rsm;
private Channel channel;
private AtomicReference<Channel> channel = new AtomicReference<Channel>();
private boolean closed;
private RedisChannelHandler<K, V> redisChannelHandler;
private final ReentrantLock writeLock = new ReentrantLock();
Expand Down Expand Up @@ -129,25 +130,25 @@ public <T> RedisCommand<K, V, T> write(RedisCommand<K, V, T> command) {
throw new RedisException("Connection is closed");
}

if (channel != null) {
if (logger.isDebugEnabled()) {
logger.debug("writeAndFlush Command " + command);
}
channel.writeAndFlush(command);
} else {
try {
writeLock.lock();
try {
writeLock.lock();

if (channel.get() != null) {
if (logger.isDebugEnabled()) {
logger.debug("buffering Command " + command);

logger.debug("[" + this + "] write() writeAndFlush Command " + command);
}
channel.get().writeAndFlush(command);
} else {
if (logger.isDebugEnabled()) {
logger.debug("[" + this + "] write() buffering Command " + command);
}
commandBuffer.put(command);
} finally {
writeLock.unlock();
}
} finally {
writeLock.unlock();
}

} catch (NullPointerException e) {
throw new RedisException("Connection is closed");
} catch (InterruptedException e) {
throw new RedisCommandInterruptedException(e);
}
Expand Down Expand Up @@ -188,38 +189,39 @@ public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise)
@Override
public void channelActive(final ChannelHandlerContext ctx) throws Exception {

logger.debug("channelActive()");
logger.debug("[" + this + "] channelActive()");
List<RedisCommand<K, V, ?>> tmp = new ArrayList<RedisCommand<K, V, ?>>(queue.size() + commandBuffer.size());

try {
writeLock.lock();

this.channel = ctx.channel();
this.channel.set(ctx.channel());

tmp.addAll(queue);
tmp.addAll(commandBuffer);

queue.clear();
commandBuffer.clear();

if (redisChannelHandler != null) {
redisChannelHandler.activated();
}

} finally {
writeLock.unlock();
}

tmp.addAll(commandBuffer);
commandBuffer.clear();
for (RedisCommand<K, V, ?> cmd : tmp) {
if (!cmd.isCancelled()) {

for (RedisCommand<K, V, ?> cmd : tmp) {
if (!cmd.isCancelled()) {
logger.debug("Triggering command " + cmd);
ctx.channel().writeAndFlush(cmd);
if (logger.isDebugEnabled()) {
logger.debug("[" + this + "] channelActive() triggering command " + cmd);
}
ctx.channel().writeAndFlush(cmd);
}
}
} finally {
writeLock.unlock();
}

tmp.clear();

logger.debug("channelActive() done");
logger.debug("[" + this + "] channelActive() done");

}

Expand All @@ -229,36 +231,44 @@ public void channelActive(final ChannelHandlerContext ctx) throws Exception {
*/
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
logger.debug("channelInactive()");
this.channel = null;
logger.debug("[" + this + "] channelInactive()");
this.channel.set(null);

if (closed) {
for (RedisCommand<K, V, ?> cmd : queue) {

List<RedisCommand<K, V, ?>> toCancel = new ArrayList<RedisCommand<K, V, ?>>(queue.size() + commandBuffer.size());

toCancel.addAll(queue);
toCancel.addAll(commandBuffer);

queue.clear();
queue = null;

commandBuffer.clear();
commandBuffer = null;

for (RedisCommand<K, V, ?> cmd : toCancel) {
if (cmd.getOutput() != null) {
cmd.getOutput().setError("Connection closed");
}
cmd.complete();
}
queue.clear();
queue = null;

commandBuffer.clear();
queue = null;

if (redisChannelHandler != null) {
redisChannelHandler.deactivated();
}
}

logger.debug("channelInactive() done");
logger.debug("[" + this + "] channelInactive() done");
}

/**
* Close the connection.
*/
@Override
public void close() {
logger.debug("close()");

logger.debug("[" + this + "] close()");

if (closed) {
return;
Expand All @@ -277,19 +287,19 @@ public void close() {

closed = true;

if (channel != null) {
ConnectionWatchdog watchdog = channel.pipeline().get(ConnectionWatchdog.class);
if (channel.get() != null) {
ConnectionWatchdog watchdog = channel.get().pipeline().get(ConnectionWatchdog.class);
if (watchdog != null) {
watchdog.setReconnect(false);
}

try {
channel.close().sync();
channel.get().close().sync();
} catch (InterruptedException e) {
throw new RedisException(e);
}

channel = null;
channel.set(null);
}
}

Expand Down

0 comments on commit ddce2ff

Please sign in to comment.