diff --git a/lettuce/src/main/java/com/lambdaworks/redis/RedisAsyncConnectionImpl.java b/lettuce/src/main/java/com/lambdaworks/redis/RedisAsyncConnectionImpl.java index 4e8e1ec82d..32b671bf77 100644 --- a/lettuce/src/main/java/com/lambdaworks/redis/RedisAsyncConnectionImpl.java +++ b/lettuce/src/main/java/com/lambdaworks/redis/RedisAsyncConnectionImpl.java @@ -200,6 +200,11 @@ public RedisFuture debugObject(K key) { return dispatch(commandBuilder.debugObject(key)); } + @Override + public void debugSegfault() { + dispatch(commandBuilder.debugSegfault()); + } + @Override public RedisFuture decr(K key) { return dispatch(commandBuilder.decr(key)); diff --git a/lettuce/src/main/java/com/lambdaworks/redis/RedisCommandBuilder.java b/lettuce/src/main/java/com/lambdaworks/redis/RedisCommandBuilder.java index a77e9919ce..6daecd711e 100644 --- a/lettuce/src/main/java/com/lambdaworks/redis/RedisCommandBuilder.java +++ b/lettuce/src/main/java/com/lambdaworks/redis/RedisCommandBuilder.java @@ -162,6 +162,11 @@ public Command debugObject(K key) { return createCommand(DEBUG, new StatusOutput(codec), args); } + public Command debugSegfault() { + CommandArgs args = new CommandArgs(codec).add(SEGFAULT); + return createCommand(DEBUG, new VoidOutput(codec), args); + } + public Command decr(K key) { return createCommand(DECR, new IntegerOutput(codec), key); } diff --git a/lettuce/src/main/java/com/lambdaworks/redis/RedisServerAsyncConnection.java b/lettuce/src/main/java/com/lambdaworks/redis/RedisServerAsyncConnection.java index 155eefdb72..22049c5bb6 100644 --- a/lettuce/src/main/java/com/lambdaworks/redis/RedisServerAsyncConnection.java +++ b/lettuce/src/main/java/com/lambdaworks/redis/RedisServerAsyncConnection.java @@ -38,6 +38,8 @@ public interface RedisServerAsyncConnection { RedisFuture debugObject(K key); + void debugSegfault(); + RedisFuture flushall(); RedisFuture flushdb(); diff --git a/lettuce/src/main/java/com/lambdaworks/redis/RedisServerConnection.java b/lettuce/src/main/java/com/lambdaworks/redis/RedisServerConnection.java index 042bf7e1b3..699af35fa3 100644 --- a/lettuce/src/main/java/com/lambdaworks/redis/RedisServerConnection.java +++ b/lettuce/src/main/java/com/lambdaworks/redis/RedisServerConnection.java @@ -38,6 +38,8 @@ public interface RedisServerConnection { String debugObject(K key); + void debugSegfault(); + String flushall(); String flushdb(); diff --git a/lettuce/src/main/java/com/lambdaworks/redis/output/VoidOutput.java b/lettuce/src/main/java/com/lambdaworks/redis/output/VoidOutput.java new file mode 100644 index 0000000000..16de358a8f --- /dev/null +++ b/lettuce/src/main/java/com/lambdaworks/redis/output/VoidOutput.java @@ -0,0 +1,32 @@ +// Copyright (C) 2011 - Will Glozer. All rights reserved. + +package com.lambdaworks.redis.output; + +import java.nio.ByteBuffer; + +import com.lambdaworks.redis.codec.RedisCodec; +import com.lambdaworks.redis.protocol.CommandOutput; + +/** + * Void (empty) output. + * + * @param Key type. + * @param Value type. + * @author Mark Paluch + */ +public class VoidOutput extends CommandOutput { + + public VoidOutput(RedisCodec codec) { + super(codec, null); + } + + @Override + public void set(ByteBuffer bytes) { + + } + + @Override + public void set(long integer) { + + } +} diff --git a/lettuce/src/main/java/com/lambdaworks/redis/protocol/CommandHandler.java b/lettuce/src/main/java/com/lambdaworks/redis/protocol/CommandHandler.java index 978a460598..7a332dd322 100644 --- a/lettuce/src/main/java/com/lambdaworks/redis/protocol/CommandHandler.java +++ b/lettuce/src/main/java/com/lambdaworks/redis/protocol/CommandHandler.java @@ -2,10 +2,17 @@ package com.lambdaworks.redis.protocol; +import java.nio.charset.Charset; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.BlockingQueue; + import com.lambdaworks.redis.RedisChannelHandler; import com.lambdaworks.redis.RedisCommandInterruptedException; import com.lambdaworks.redis.RedisException; import com.lambdaworks.redis.internal.RedisChannelWriter; +import com.lambdaworks.redis.output.VoidOutput; + import io.netty.buffer.ByteBuf; import io.netty.channel.Channel; import io.netty.channel.ChannelDuplexHandler; @@ -15,11 +22,6 @@ import io.netty.util.internal.logging.InternalLogger; import io.netty.util.internal.logging.InternalLoggerFactory; -import java.nio.charset.Charset; -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.BlockingQueue; - /** * A netty {@link ChannelHandler} responsible for writing redis commands and reading responses from the server. * @@ -54,7 +56,7 @@ public CommandHandler(BlockingQueue> queue) { */ @Override public void channelRegistered(ChannelHandlerContext ctx) throws Exception { - buffer = ctx.alloc().heapBuffer(); + buffer = ctx.alloc().heapBuffer(); rsm = new RedisStateMachine(); } @@ -64,33 +66,28 @@ public void channelRegistered(ChannelHandlerContext ctx) throws Exception { */ @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { - ByteBuf input = (ByteBuf) msg; - try - { + ByteBuf input = (ByteBuf) msg; + try { if (!input.isReadable() || input.refCnt() == 0) { - return; + return; } - synchronized (lock) - { + synchronized (lock) { if (buffer == null) { logger.warn("CommandHandler is closed, incoming response will be discarded."); return; } buffer.writeBytes(input); - if (logger.isDebugEnabled()) - { + if (logger.isDebugEnabled()) { logger.debug("[" + channel.remoteAddress() + "] Received: " + buffer.toString(Charset.defaultCharset()).trim()); } decode(ctx, buffer); - } + } - } - finally - { + } finally { input.release(); } } @@ -100,8 +97,7 @@ protected void decode(ChannelHandlerContext ctx, ByteBuf buffer) throws Interrup while (!queue.isEmpty() && rsm.decode(buffer, queue.peek(), queue.peek().getOutput())) { RedisCommand cmd = queue.take(); cmd.complete(); - if (buffer != null && buffer.refCnt() != 0) - { + if (buffer != null && buffer.refCnt() != 0) { buffer.discardReadBytes(); } } @@ -126,24 +122,27 @@ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws E */ @Override @SuppressWarnings("unchecked") - public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception - { + public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { final RedisCommand cmd = (RedisCommand) msg; ByteBuf buf = ctx.alloc().heapBuffer(); - cmd.encode(buf); - if (logger.isDebugEnabled()) - { + cmd.encode(buf); + if (logger.isDebugEnabled()) { logger.debug("[" + channel.remoteAddress() + "] Sent: " + buf.toString(Charset.defaultCharset()).trim()); - } + } + + synchronized (queue) { - synchronized (queue) - { - if (!queue.contains(cmd)) - { - queue.put(cmd); + if (cmd.getOutput() instanceof VoidOutput) { + queue.remove(cmd); + ctx.write(buf, promise); + cmd.complete(); + } else { + if (!queue.contains(cmd)) { + queue.put(cmd); + } + ctx.write(buf, promise); } - ctx.write(buf, promise); } } @@ -152,8 +151,7 @@ public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) * @see io.netty.channel.ChannelInboundHandlerAdapter#channelActive(io.netty.channel.ChannelHandlerContext) */ @Override - public void channelActive(final ChannelHandlerContext ctx) throws Exception - { + public void channelActive(final ChannelHandlerContext ctx) throws Exception { logger.debug("channelActive()"); this.channel = ctx.channel(); @@ -163,39 +161,33 @@ public void channelActive(final ChannelHandlerContext ctx) throws Exception tmp.addAll(queue); queue.clear(); - if (redisChannelHandler != null) - { + if (redisChannelHandler != null) { redisChannelHandler.activated(); } for (RedisCommand cmd : tmp) { - if (!cmd.isCancelled()) - { + if (!cmd.isCancelled()) { logger.debug("Triggering command " + cmd); ctx.channel().writeAndFlush(cmd); } } - tmp.clear(); + tmp.clear(); } /** - * + * * @see io.netty.channel.ChannelInboundHandlerAdapter#channelInactive(io.netty.channel.ChannelHandlerContext) */ @Override - public void channelInactive(ChannelHandlerContext ctx) throws Exception - { + public void channelInactive(ChannelHandlerContext ctx) throws Exception { logger.debug("channelInactive()"); - try - { + try { this.channel = null; if (closed) { - for (RedisCommand cmd : queue) - { - if (cmd.getOutput() != null) - { + for (RedisCommand cmd : queue) { + if (cmd.getOutput() != null) { cmd.getOutput().setError("Connection closed"); } cmd.complete(); @@ -203,13 +195,11 @@ public void channelInactive(ChannelHandlerContext ctx) throws Exception queue.clear(); queue = null; - if (redisChannelHandler != null) - { + if (redisChannelHandler != null) { redisChannelHandler.deactivated(); } } - } - catch (RuntimeException e) { + } catch (RuntimeException e) { logger.error(e.getMessage(), e); throw e; } @@ -219,24 +209,21 @@ public void channelInactive(ChannelHandlerContext ctx) throws Exception public RedisCommand write(RedisCommand command) { try { - if (closed) - { + if (closed) { throw new RedisException("Connection is closed"); } if (channel != null) { channel.writeAndFlush(command); } else { - synchronized (queue) - { + synchronized (queue) { queue.put(command); } } - } catch (NullPointerException e) - { + } catch (NullPointerException e) { throw new RedisException("Connection is closed"); } catch (InterruptedException e) { - throw new RedisCommandInterruptedException(e); + throw new RedisCommandInterruptedException(e); } return command; @@ -246,8 +233,7 @@ public RedisCommand write(RedisCommand command) { * Close the connection. */ @Override - public synchronized void close() - { + public synchronized void close() { logger.debug("close()"); if (closed) { @@ -255,36 +241,31 @@ public synchronized void close() return; } - if (buffer != null) - { - synchronized (lock) - { + if (buffer != null) { + synchronized (lock) { buffer.release(); } buffer = null; } - if (!closed && channel != null) { + if (!closed && channel != null) { ConnectionWatchdog watchdog = channel.pipeline().get(ConnectionWatchdog.class); if (watchdog != null) { watchdog.setReconnect(false); } closed = true; - try - { + try { channel.close().sync(); - } catch (InterruptedException e) - { + } catch (InterruptedException e) { throw new RedisException(e); } channel = null; - } + } } - public boolean isClosed() - { + public boolean isClosed() { return closed; } diff --git a/lettuce/src/main/java/com/lambdaworks/redis/protocol/CommandKeyword.java b/lettuce/src/main/java/com/lambdaworks/redis/protocol/CommandKeyword.java index 38f4f2ae2c..b56bbdb3ce 100644 --- a/lettuce/src/main/java/com/lambdaworks/redis/protocol/CommandKeyword.java +++ b/lettuce/src/main/java/com/lambdaworks/redis/protocol/CommandKeyword.java @@ -8,7 +8,7 @@ * @author Will Glozer */ public enum CommandKeyword { - ADDSLOTS, AFTER, AGGREGATE, ALPHA, AND, ASC, BEFORE, BY, COUNT, DELSLOTS, DESC, SOFT, HARD, ENCODING, FAILOVER, FORGET, FLUSH, FORCE, FLUSHSLOTS, GETNAME, GETKEYSINSLOT, IDLETIME, KILL, LEN, LIMIT, LIST, LOAD, MATCH, MAX, MEET, MIN, MOVED, NO, NODE, NODES, NOSAVE, NOT, ONE, OR, PAUSE, REFCOUNT, REPLICATE, RESET, REWRITE, RESETSTAT, SETNAME, SETSLOT, MIGRATING, IMPORTING, SLAVES, STORE, SUM, WEIGHTS, WITHSCORES, XOR, REMOVE; + ADDSLOTS, AFTER, AGGREGATE, ALPHA, AND, ASC, BEFORE, BY, COUNT, DELSLOTS, DESC, SOFT, HARD, ENCODING, FAILOVER, FORGET, FLUSH, FORCE, FLUSHSLOTS, GETNAME, GETKEYSINSLOT, IDLETIME, KILL, LEN, LIMIT, LIST, LOAD, MATCH, MAX, MEET, MIN, MOVED, NO, NODE, NODES, NOSAVE, NOT, ONE, OR, PAUSE, REFCOUNT, REPLICATE, RESET, REWRITE, RESETSTAT, SETNAME, SETSLOT, MIGRATING, IMPORTING, SLAVES, STORE, SUM, SEGFAULT, WEIGHTS, WITHSCORES, XOR, REMOVE; public byte[] bytes; diff --git a/lettuce/src/test/java/com/lambdaworks/redis/ServerCommandTest.java b/lettuce/src/test/java/com/lambdaworks/redis/ServerCommandTest.java index 691ae18c7d..83c5596ce9 100644 --- a/lettuce/src/test/java/com/lambdaworks/redis/ServerCommandTest.java +++ b/lettuce/src/test/java/com/lambdaworks/redis/ServerCommandTest.java @@ -13,6 +13,7 @@ import java.util.regex.Matcher; import java.util.regex.Pattern; +import org.junit.Ignore; import org.junit.Test; public class ServerCommandTest extends AbstractCommandTest { @@ -96,6 +97,12 @@ public void debugObject() throws Exception { redis.debugObject(key); } + @Test + @Ignore("This test will kill your redis server, therefore it's disabled by default") + public void debugSegfault() throws Exception { + redis.debugSegfault(); + } + @Test public void flushall() throws Exception { redis.set(key, value);