diff --git a/src/main/java/io/lettuce/core/protocol/RedisStateMachine.java b/src/main/java/io/lettuce/core/protocol/RedisStateMachine.java index 14866a9558..ee614b1d20 100644 --- a/src/main/java/io/lettuce/core/protocol/RedisStateMachine.java +++ b/src/main/java/io/lettuce/core/protocol/RedisStateMachine.java @@ -46,29 +46,34 @@ public class RedisStateMachine { static class State { + /** + * Callback interface to handle a {@link State}. + */ @FunctionalInterface - interface StateBehavior { - Result reaction(RedisStateMachine rsm, State state, ByteBuf buffer, RedisCommand command, + interface StateHandler { + + Result handle(RedisStateMachine rsm, State state, ByteBuf buffer, RedisCommand command, CommandOutput output); } - enum Type { - SINGLE(RedisStateMachine::reactionStateSingle), - ERROR(RedisStateMachine::reactionStateError), - INTEGER(RedisStateMachine::reactionStateInteger), - BULK(RedisStateMachine::reactionStateBulk), - MULTI(RedisStateMachine::reactionStateMulti), - BYTES(RedisStateMachine::reactionStateBytes); + enum Type implements StateHandler { + + SINGLE(RedisStateMachine::handleSingle), // + ERROR(RedisStateMachine::handleError), // + INTEGER(RedisStateMachine::handleInteger), // + BULK(RedisStateMachine::handleBulk), // + MULTI(RedisStateMachine::handleMulti), // + BYTES(RedisStateMachine::handleBytes); - private final StateBehavior behavior; + private final StateHandler behavior; - Type(StateBehavior behavior) { + Type(StateHandler behavior) { this.behavior = behavior; } - public Result reaction(RedisStateMachine rsm, State state, ByteBuf buffer, RedisCommand command, + public Result handle(RedisStateMachine rsm, State state, ByteBuf buffer, RedisCommand command, CommandOutput output) { - return behavior.reaction(rsm, state, buffer, command, output); + return behavior.handle(rsm, state, buffer, command, output); } } @@ -79,7 +84,6 @@ enum Result { Type type = null; int count = -1; - } private final State[] stack = new State[32]; @@ -120,7 +124,9 @@ public boolean decode(ByteBuf buffer, CommandOutput output) { * @return {@code true} if a complete response was read. */ public boolean decode(ByteBuf buffer, RedisCommand command, CommandOutput output) { + buffer.touch("RedisStateMachine.decode(…)"); + if (debugEnabled) { logger.debug("Decode {}", command); } @@ -133,7 +139,7 @@ public boolean decode(ByteBuf buffer, RedisCommand command, CommandOutp return isEmpty(stack); } - loopPeekStack(buffer, command, output); + doDecode(buffer, command, output); if (debugEnabled) { logger.debug("Decoded {}, empty stack: {}", command, isEmpty(stack)); @@ -142,7 +148,8 @@ public boolean decode(ByteBuf buffer, RedisCommand command, CommandOutp return isEmpty(stack); } - private void loopPeekStack(ByteBuf buffer, RedisCommand command, CommandOutput output) { + private void doDecode(ByteBuf buffer, RedisCommand command, CommandOutput output) { + State.Result result; while (!isEmpty(stack)) { @@ -156,7 +163,7 @@ private void loopPeekStack(ByteBuf buffer, RedisCommand command, Comman buffer.markReaderIndex(); } - result = state.type.reaction(this, state, buffer, command, output); + result = state.type.handle(this, state, buffer, command, output); if (State.Result.BREAK_LOOP.equals(result)) { break; } else if (State.Result.CONTINUE_LOOP.equals(result)) { @@ -170,8 +177,9 @@ private void loopPeekStack(ByteBuf buffer, RedisCommand command, Comman } } - private static State.Result reactionStateSingle(RedisStateMachine rsm, State state, ByteBuf buffer, - RedisCommand command, CommandOutput output) { + private static State.Result handleSingle(RedisStateMachine rsm, State state, ByteBuf buffer, RedisCommand command, + CommandOutput output) { + ByteBuffer bytes; if ((bytes = rsm.readLine(buffer)) == null) { return State.Result.BREAK_LOOP; @@ -183,8 +191,9 @@ private static State.Result reactionStateSingle(RedisStateMachine rsm, State sta return State.Result.NORMAL_END; } - private static State.Result reactionStateError(RedisStateMachine rsm, State state, ByteBuf buffer, RedisCommand command, + private static State.Result handleError(RedisStateMachine rsm, State state, ByteBuf buffer, RedisCommand command, CommandOutput output) { + ByteBuffer bytes; if ((bytes = rsm.readLine(buffer)) == null) { return State.Result.BREAK_LOOP; @@ -194,8 +203,9 @@ private static State.Result reactionStateError(RedisStateMachine rsm, State stat return State.Result.NORMAL_END; } - private static State.Result reactionStateInteger(RedisStateMachine rsm, State state, ByteBuf buffer, - RedisCommand command, CommandOutput output) { + private static State.Result handleInteger(RedisStateMachine rsm, State state, ByteBuf buffer, RedisCommand command, + CommandOutput output) { + int end; if ((end = rsm.findLineEnd(buffer)) == -1) { return State.Result.BREAK_LOOP; @@ -206,8 +216,9 @@ private static State.Result reactionStateInteger(RedisStateMachine rsm, State st return State.Result.NORMAL_END; } - private static State.Result reactionStateBulk(RedisStateMachine rsm, State state, ByteBuf buffer, RedisCommand command, + private static State.Result handleBulk(RedisStateMachine rsm, State state, ByteBuf buffer, RedisCommand command, CommandOutput output) { + int length; int end; if ((end = rsm.findLineEnd(buffer)) == -1) { @@ -223,11 +234,13 @@ private static State.Result reactionStateBulk(RedisStateMachine rsm, State state buffer.markReaderIndex(); return State.Result.CONTINUE_LOOP; } + return State.Result.NORMAL_END; } - private static State.Result reactionStateMulti(RedisStateMachine rsm, State state, ByteBuf buffer, RedisCommand command, + private static State.Result handleMulti(RedisStateMachine rsm, State state, ByteBuf buffer, RedisCommand command, CommandOutput output) { + int length; int end; if (state.count == -1) { @@ -250,12 +263,14 @@ private static State.Result reactionStateMulti(RedisStateMachine rsm, State stat return State.Result.CONTINUE_LOOP; } - private static State.Result reactionStateBytes(RedisStateMachine rsm, State state, ByteBuf buffer, RedisCommand command, + private static State.Result handleBytes(RedisStateMachine rsm, State state, ByteBuf buffer, RedisCommand command, CommandOutput output) { + ByteBuffer bytes; if ((bytes = rsm.readBytes(buffer, state.count)) == null) { return State.Result.BREAK_LOOP; } + rsm.safeSet(output, bytes, command); return State.Result.NORMAL_END; } @@ -297,7 +312,7 @@ private State.Type readReplyType(ByteBuf buffer) { case '*': return MULTI; default: - throw new RedisException("Invalid first byte: " + Byte.toString(b)); + throw new RedisException("Invalid first byte: " + b); } } @@ -518,7 +533,7 @@ public long getValue(ByteBuf buffer, int start, int end) { } @Override - public boolean process(byte value) throws Exception { + public boolean process(byte value) { if (first) { first = false;