diff --git a/src/main/java/io/lettuce/core/StatefulRedisConnectionImpl.java b/src/main/java/io/lettuce/core/StatefulRedisConnectionImpl.java index 4f599f638f..fc0e844636 100644 --- a/src/main/java/io/lettuce/core/StatefulRedisConnectionImpl.java +++ b/src/main/java/io/lettuce/core/StatefulRedisConnectionImpl.java @@ -158,9 +158,7 @@ public RedisCommand dispatch(RedisCommand command) { RedisCommand toSend = preProcessCommand(command); - if (command.getType().name().equals(MULTI.name())) { - multi = (multi == null ? new MultiOutput<>(codec) : multi); - } + potentiallyEnableMulti(command); return super.dispatch(toSend); } @@ -174,14 +172,28 @@ public RedisCommand dispatch(RedisCommand command) { RedisCommand command = preProcessCommand(o); sentCommands.add(command); - if (command.getType().name().equals(MULTI.name())) { - multi = (multi == null ? new MultiOutput<>(codec) : multi); - } + potentiallyEnableMulti(command); }); return super.dispatch(sentCommands); } + private void potentiallyEnableMulti(RedisCommand command) { + + if (command.getType().name().equals(MULTI.name())) { + + multi = (multi == null ? new MultiOutput<>(codec) : multi); + + if (command instanceof CompleteableCommand) { + ((CompleteableCommand) command).onComplete((ignored, e) -> { + if (e != null) { + multi = null; + } + }); + } + } + } + protected RedisCommand preProcessCommand(RedisCommand command) { RedisCommand local = command;