Skip to content

Commit

Permalink
Inspect Pub/Sub responses for interleaved messages #724
Browse files Browse the repository at this point in the history
Lettuce now inspects Redis responses via PubSubCommandHandler and ReplayOutput whether a received response is a Pub/Sub message or whether the response belongs to a command on the protocol stack. Introspection is required as Redis responses may contain interleaved messages that do not belong to a command or may arrive before the command response.

Previously, interleaved messages could get used to complete commands on the protocol stack which causes a defunct protocol state.
  • Loading branch information
mp911de committed Mar 19, 2018
1 parent 7bf7838 commit ecbe365
Show file tree
Hide file tree
Showing 6 changed files with 573 additions and 18 deletions.
197 changes: 197 additions & 0 deletions src/main/java/com/lambdaworks/redis/output/ReplayOutput.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,197 @@
/*
* Copyright 2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.lambdaworks.redis.output;

import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;

import com.lambdaworks.redis.codec.RedisCodec;
import com.lambdaworks.redis.codec.StringCodec;

/**
* Replayable {@link CommandOutput} capturing output signals to replay these on a target {@link CommandOutput}. Replay is useful
* when the response requires inspection prior to dispatching the actual output to a command target.
*
* @author Mark Paluch
* @since 4.4.4
*/
public class ReplayOutput<K, V> extends CommandOutput<K, V, List<ReplayOutput.Signal>> {

/**
* Initialize a new instance that encodes and decodes keys and values using the supplied codec.
*/
public ReplayOutput() {
super((RedisCodec) StringCodec.ASCII, new ArrayList<>());
}

@Override
public void set(ByteBuffer bytes) {
output.add(new BulkString(bytes));
}

@Override
public void set(long integer) {
output.add(new Integer(integer));
}

@Override
public void setError(ByteBuffer error) {
error.mark();
output.add(new ErrorBytes(error));
error.reset();
super.setError(error);
}

@Override
public void setError(String error) {
output.add(new ErrorString(error));
super.setError(error);
}

@Override
public void complete(int depth) {
output.add(new Complete(depth));
}

@Override
public void multi(int count) {
output.add(new Multi(count));
}

/**
* Replay all captured signals on a {@link CommandOutput}.
*
* @param target the target {@link CommandOutput}.
*/
public void replay(CommandOutput<?, ?, ?> target) {

for (Signal signal : output) {
signal.replay(target);
}
}

/**
* Encapsulates a replayable decoding signal.
*/
public static abstract class Signal {

/**
* Replay the signal on a {@link CommandOutput}.
*
* @param target
*/
protected abstract void replay(CommandOutput<?, ?, ?> target);
}

abstract static class BulkStringSupport extends Signal {

final ByteBuffer message;

BulkStringSupport(ByteBuffer message) {

if (message != null) {

// need to copy the buffer to prevent buffer lifecycle mismatch
this.message = ByteBuffer.allocate(message.remaining());
this.message.put(message);
this.message.rewind();
} else {
this.message = null;
}
}
}

public static class BulkString extends BulkStringSupport {

BulkString(ByteBuffer message) {
super(message);
}

@Override
protected void replay(CommandOutput<?, ?, ?> target) {
target.set(message);
}
}

static class Integer extends Signal {

final long message;

Integer(long message) {
this.message = message;
}

@Override
protected void replay(CommandOutput<?, ?, ?> target) {
target.set(message);
}
}

public static class ErrorBytes extends BulkStringSupport {

ErrorBytes(ByteBuffer message) {
super(message);
}

@Override
protected void replay(CommandOutput<?, ?, ?> target) {
target.setError(message);
}
}

static class ErrorString extends Signal {

final String message;

ErrorString(String message) {
this.message = message;
}

@Override
protected void replay(CommandOutput<?, ?, ?> target) {
target.setError(message);
}
}

static class Multi extends Signal {

final int count;

Multi(int count) {
this.count = count;
}

@Override
protected void replay(CommandOutput<?, ?, ?> target) {
target.multi(count);
}
}

static class Complete extends Signal {

final int depth;

public Complete(int depth) {
this.depth = depth;
}

@Override
protected void replay(CommandOutput<?, ?, ?> target) {
target.complete(depth);
}
}
}
69 changes: 57 additions & 12 deletions src/main/java/com/lambdaworks/redis/protocol/CommandHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import com.lambdaworks.redis.internal.LettuceClassUtils;
import com.lambdaworks.redis.internal.LettuceFactories;
import com.lambdaworks.redis.internal.LettuceSets;
import com.lambdaworks.redis.output.CommandOutput;
import com.lambdaworks.redis.resource.ClientResources;

import io.netty.buffer.ByteBuf;
Expand Down Expand Up @@ -145,6 +146,10 @@ public CommandHandler(ClientOptions clientOptions, ClientResources clientResourc
boundedQueue = clientOptions.getRequestQueueSize() != Integer.MAX_VALUE;
}

protected Deque<RedisCommand<K, V, ?>> getStack() {
return stack;
}

@Override
public void setRedisChannelHandler(RedisChannelHandler<K, V> redisChannelHandler) {
this.redisChannelHandler = redisChannelHandler;
Expand Down Expand Up @@ -301,19 +306,21 @@ protected void decode(ChannelHandlerContext ctx, ByteBuf buffer) {
}

if (isProtectedMode(command)) {
onProtectedMode(command.getOutput().getError());
onProtectedMode(getCommandOutput(command).getError());
} else {

stack.poll();
if (canComplete(command)) {
stack.poll();

try {
command.complete();
} catch (Exception e) {
logger.warn("{} Unexpected exception during command completion: {}", logPrefix, e.toString(), e);
try {
complete(command);
} catch (Exception e) {
logger.warn("{} Unexpected exception during command completion: {}", logPrefix, e.toString(), e);
}
}
}

afterComplete(ctx, command);
afterDecode(ctx, command);
}

if (buffer.refCnt() != 0) {
Expand Down Expand Up @@ -383,19 +390,57 @@ private void onProtectedMode(String message) {
cancelCommands(message);
}

/**
* Decoding hook: Can the buffer be decoded to a command.
*
* @param buffer
* @return
*/
protected boolean canDecode(ByteBuf buffer) {
return !stack.isEmpty() && buffer.isReadable();
}

/**
* Decoding hook: Can the command be completed.
*
* @param command
* @return
*/
protected boolean canComplete(RedisCommand<?, ?, ?> command) {
return true;
}

/**
* Decoding hook: Complete a command.
*
* @param command
* @see RedisCommand#complete()
*/
protected void complete(RedisCommand<?, ?, ?> command) {
command.complete();
}

/**
* Hook method called after command completion.
*
* @param ctx
* @param command
*/
protected void afterComplete(ChannelHandlerContext ctx, RedisCommand<K, V, ?> command) {
protected void afterDecode(ChannelHandlerContext ctx, RedisCommand<K, V, ?> command) {
}

protected boolean canDecode(ByteBuf buffer) {
return !stack.isEmpty() && buffer.isReadable();
/**
* Decoding hook: Retrieve {@link CommandOutput} for {@link RedisCommand} decoding.
*
* @param command
* @return
* @see RedisCommand#getOutput()
*/
protected CommandOutput<K, V, ?> getCommandOutput(RedisCommand<K, V, ?> command) {
return command.getOutput();
}


private boolean decode(ByteBuf buffer, RedisCommand<K, V, ?> command) {

if (latencyMetricsEnabled && command instanceof WithLatency) {
Expand All @@ -405,7 +450,7 @@ private boolean decode(ByteBuf buffer, RedisCommand<K, V, ?> command) {
withLatency.firstResponse(nanoTime());
}

if (!rsm.decode(buffer, command, command.getOutput())) {
if (!rsm.decode(buffer, command, getCommandOutput(command))) {
return false;
}

Expand All @@ -414,7 +459,7 @@ private boolean decode(ByteBuf buffer, RedisCommand<K, V, ?> command) {
return true;
}

return rsm.decode(buffer, command, command.getOutput());
return rsm.decode(buffer, command, getCommandOutput(command));
}

private void recordLatency(WithLatency withLatency, ProtocolKeyword commandType) {
Expand Down
Loading

0 comments on commit ecbe365

Please sign in to comment.