Skip to content

Commit

Permalink
Support for
Browse files Browse the repository at this point in the history
DEBUG SEGFAULT
  • Loading branch information
mp911de committed Jun 17, 2014
1 parent c6722f5 commit a47686c
Show file tree
Hide file tree
Showing 8 changed files with 108 additions and 74 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,11 @@ public RedisFuture<String> debugObject(K key) {
return dispatch(commandBuilder.debugObject(key));
}

@Override
public void debugSegfault() {
dispatch(commandBuilder.debugSegfault());
}

@Override
public RedisFuture<Long> decr(K key) {
return dispatch(commandBuilder.decr(key));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,11 @@ public Command<K, V, String> debugObject(K key) {
return createCommand(DEBUG, new StatusOutput<K, V>(codec), args);
}

public Command<K, V, Void> debugSegfault() {
CommandArgs<K, V> args = new CommandArgs<K, V>(codec).add(SEGFAULT);
return createCommand(DEBUG, new VoidOutput<K, V>(codec), args);
}

public Command<K, V, Long> decr(K key) {
return createCommand(DECR, new IntegerOutput<K, V>(codec), key);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ public interface RedisServerAsyncConnection<K, V> {

RedisFuture<String> debugObject(K key);

void debugSegfault();

RedisFuture<String> flushall();

RedisFuture<String> flushdb();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ public interface RedisServerConnection<K, V> {

String debugObject(K key);

void debugSegfault();

String flushall();

String flushdb();
Expand Down
32 changes: 32 additions & 0 deletions lettuce/src/main/java/com/lambdaworks/redis/output/VoidOutput.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
// Copyright (C) 2011 - Will Glozer. All rights reserved.

package com.lambdaworks.redis.output;

import java.nio.ByteBuffer;

import com.lambdaworks.redis.codec.RedisCodec;
import com.lambdaworks.redis.protocol.CommandOutput;

/**
* Void (empty) output.
*
* @param <K> Key type.
* @param <V> Value type.
* @author Mark Paluch
*/
public class VoidOutput<K, V> extends CommandOutput<K, V, Void> {

public VoidOutput(RedisCodec<K, V> codec) {
super(codec, null);
}

@Override
public void set(ByteBuffer bytes) {

}

@Override
public void set(long integer) {

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,17 @@

package com.lambdaworks.redis.protocol;

import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.BlockingQueue;

import com.lambdaworks.redis.RedisChannelHandler;
import com.lambdaworks.redis.RedisCommandInterruptedException;
import com.lambdaworks.redis.RedisException;
import com.lambdaworks.redis.internal.RedisChannelWriter;
import com.lambdaworks.redis.output.VoidOutput;

import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel;
import io.netty.channel.ChannelDuplexHandler;
Expand All @@ -15,11 +22,6 @@
import io.netty.util.internal.logging.InternalLogger;
import io.netty.util.internal.logging.InternalLoggerFactory;

import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.BlockingQueue;

/**
* A netty {@link ChannelHandler} responsible for writing redis commands and reading responses from the server.
*
Expand Down Expand Up @@ -54,7 +56,7 @@ public CommandHandler(BlockingQueue<RedisCommand<K, V, ?>> queue) {
*/
@Override
public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
buffer = ctx.alloc().heapBuffer();
buffer = ctx.alloc().heapBuffer();
rsm = new RedisStateMachine<K, V>();
}

Expand All @@ -64,33 +66,28 @@ public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf input = (ByteBuf) msg;
try
{
ByteBuf input = (ByteBuf) msg;
try {
if (!input.isReadable() || input.refCnt() == 0) {
return;
return;
}

synchronized (lock)
{
synchronized (lock) {
if (buffer == null) {
logger.warn("CommandHandler is closed, incoming response will be discarded.");
return;
}
buffer.writeBytes(input);

if (logger.isDebugEnabled())
{
if (logger.isDebugEnabled()) {
logger.debug("[" + channel.remoteAddress() + "] Received: "
+ buffer.toString(Charset.defaultCharset()).trim());
}

decode(ctx, buffer);
}
}

}
finally
{
} finally {
input.release();
}
}
Expand All @@ -100,8 +97,7 @@ protected void decode(ChannelHandlerContext ctx, ByteBuf buffer) throws Interrup
while (!queue.isEmpty() && rsm.decode(buffer, queue.peek(), queue.peek().getOutput())) {
RedisCommand<K, V, ?> cmd = queue.take();
cmd.complete();
if (buffer != null && buffer.refCnt() != 0)
{
if (buffer != null && buffer.refCnt() != 0) {
buffer.discardReadBytes();
}
}
Expand All @@ -126,24 +122,27 @@ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws E
*/
@Override
@SuppressWarnings("unchecked")
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception
{
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {

final RedisCommand<K, V, ?> cmd = (RedisCommand<K, V, ?>) msg;
ByteBuf buf = ctx.alloc().heapBuffer();
cmd.encode(buf);
if (logger.isDebugEnabled())
{
cmd.encode(buf);
if (logger.isDebugEnabled()) {
logger.debug("[" + channel.remoteAddress() + "] Sent: " + buf.toString(Charset.defaultCharset()).trim());
}
}

synchronized (queue) {

synchronized (queue)
{
if (!queue.contains(cmd))
{
queue.put(cmd);
if (cmd.getOutput() instanceof VoidOutput) {
queue.remove(cmd);
ctx.write(buf, promise);
cmd.complete();
} else {
if (!queue.contains(cmd)) {
queue.put(cmd);
}
ctx.write(buf, promise);
}
ctx.write(buf, promise);
}

}
Expand All @@ -152,8 +151,7 @@ public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise)
* @see io.netty.channel.ChannelInboundHandlerAdapter#channelActive(io.netty.channel.ChannelHandlerContext)
*/
@Override
public void channelActive(final ChannelHandlerContext ctx) throws Exception
{
public void channelActive(final ChannelHandlerContext ctx) throws Exception {

logger.debug("channelActive()");
this.channel = ctx.channel();
Expand All @@ -163,53 +161,45 @@ public void channelActive(final ChannelHandlerContext ctx) throws Exception
tmp.addAll(queue);
queue.clear();

if (redisChannelHandler != null)
{
if (redisChannelHandler != null) {
redisChannelHandler.activated();
}

for (RedisCommand<K, V, ?> cmd : tmp) {
if (!cmd.isCancelled())
{
if (!cmd.isCancelled()) {
logger.debug("Triggering command " + cmd);
ctx.channel().writeAndFlush(cmd);
}
}

tmp.clear();
tmp.clear();

}

/**
*
*
* @see io.netty.channel.ChannelInboundHandlerAdapter#channelInactive(io.netty.channel.ChannelHandlerContext)
*/
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception
{
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
logger.debug("channelInactive()");
try
{
try {
this.channel = null;
if (closed) {
for (RedisCommand<K, V, ?> cmd : queue)
{
if (cmd.getOutput() != null)
{
for (RedisCommand<K, V, ?> cmd : queue) {
if (cmd.getOutput() != null) {
cmd.getOutput().setError("Connection closed");
}
cmd.complete();
}
queue.clear();
queue = null;

if (redisChannelHandler != null)
{
if (redisChannelHandler != null) {
redisChannelHandler.deactivated();
}
}
}
catch (RuntimeException e) {
} catch (RuntimeException e) {
logger.error(e.getMessage(), e);
throw e;
}
Expand All @@ -219,24 +209,21 @@ public void channelInactive(ChannelHandlerContext ctx) throws Exception
public <T> RedisCommand<K, V, T> write(RedisCommand<K, V, T> command) {
try {

if (closed)
{
if (closed) {
throw new RedisException("Connection is closed");
}

if (channel != null) {
channel.writeAndFlush(command);
} else {
synchronized (queue)
{
synchronized (queue) {
queue.put(command);
}
}
} catch (NullPointerException e)
{
} catch (NullPointerException e) {
throw new RedisException("Connection is closed");
} catch (InterruptedException e) {
throw new RedisCommandInterruptedException(e);
throw new RedisCommandInterruptedException(e);
}

return command;
Expand All @@ -246,45 +233,39 @@ public <T> RedisCommand<K, V, T> write(RedisCommand<K, V, T> command) {
* Close the connection.
*/
@Override
public synchronized void close()
{
public synchronized void close() {
logger.debug("close()");

if (closed) {
logger.warn("Client is already closed");
return;
}

if (buffer != null)
{
synchronized (lock)
{
if (buffer != null) {
synchronized (lock) {
buffer.release();
}
buffer = null;
}

if (!closed && channel != null) {
if (!closed && channel != null) {
ConnectionWatchdog watchdog = channel.pipeline().get(ConnectionWatchdog.class);
if (watchdog != null) {
watchdog.setReconnect(false);
}
closed = true;
try
{
try {
channel.close().sync();
} catch (InterruptedException e)
{
} catch (InterruptedException e) {
throw new RedisException(e);
}

channel = null;
}
}

}

public boolean isClosed()
{
public boolean isClosed() {
return closed;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
* @author Will Glozer
*/
public enum CommandKeyword {
ADDSLOTS, AFTER, AGGREGATE, ALPHA, AND, ASC, BEFORE, BY, COUNT, DELSLOTS, DESC, SOFT, HARD, ENCODING, FAILOVER, FORGET, FLUSH, FORCE, FLUSHSLOTS, GETNAME, GETKEYSINSLOT, IDLETIME, KILL, LEN, LIMIT, LIST, LOAD, MATCH, MAX, MEET, MIN, MOVED, NO, NODE, NODES, NOSAVE, NOT, ONE, OR, PAUSE, REFCOUNT, REPLICATE, RESET, REWRITE, RESETSTAT, SETNAME, SETSLOT, MIGRATING, IMPORTING, SLAVES, STORE, SUM, WEIGHTS, WITHSCORES, XOR, REMOVE;
ADDSLOTS, AFTER, AGGREGATE, ALPHA, AND, ASC, BEFORE, BY, COUNT, DELSLOTS, DESC, SOFT, HARD, ENCODING, FAILOVER, FORGET, FLUSH, FORCE, FLUSHSLOTS, GETNAME, GETKEYSINSLOT, IDLETIME, KILL, LEN, LIMIT, LIST, LOAD, MATCH, MAX, MEET, MIN, MOVED, NO, NODE, NODES, NOSAVE, NOT, ONE, OR, PAUSE, REFCOUNT, REPLICATE, RESET, REWRITE, RESETSTAT, SETNAME, SETSLOT, MIGRATING, IMPORTING, SLAVES, STORE, SUM, SEGFAULT, WEIGHTS, WITHSCORES, XOR, REMOVE;

public byte[] bytes;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import java.util.regex.Matcher;
import java.util.regex.Pattern;

import org.junit.Ignore;
import org.junit.Test;

public class ServerCommandTest extends AbstractCommandTest {
Expand Down Expand Up @@ -96,6 +97,12 @@ public void debugObject() throws Exception {
redis.debugObject(key);
}

@Test
@Ignore("This test will kill your redis server, therefore it's disabled by default")
public void debugSegfault() throws Exception {
redis.debugSegfault();
}

@Test
public void flushall() throws Exception {
redis.set(key, value);
Expand Down

0 comments on commit a47686c

Please sign in to comment.