diff --git a/src/main/java/io/lettuce/core/output/StreamReadOutput.java b/src/main/java/io/lettuce/core/output/StreamReadOutput.java index 0346aa877b..044897f29e 100644 --- a/src/main/java/io/lettuce/core/output/StreamReadOutput.java +++ b/src/main/java/io/lettuce/core/output/StreamReadOutput.java @@ -46,6 +46,8 @@ public class StreamReadOutput extends CommandOutput body; + private boolean bodyReceived = false; + public StreamReadOutput(RedisCodec codec) { super(codec, Collections.emptyList()); setSubscriber(ListSubscriber.instance()); @@ -70,6 +72,11 @@ public void set(ByteBuffer bytes) { } if (key == null) { + bodyReceived = true; + if (bytes == null) { + return; + } + key = codec.decodeKey(bytes); return; } @@ -94,8 +101,9 @@ public void multi(int count) { @Override public void complete(int depth) { - if (depth == 3 && body != null) { - subscriber.onNext(output, new StreamMessage<>(stream, id, body)); + if (depth == 3 && bodyReceived) { + subscriber.onNext(output, new StreamMessage<>(stream, id, body == null ? Collections.emptyMap() : body)); + bodyReceived = false; key = null; body = null; id = null; diff --git a/src/test/java/io/lettuce/core/commands/StreamCommandIntegrationTests.java b/src/test/java/io/lettuce/core/commands/StreamCommandIntegrationTests.java index caab778ef2..2860b73a86 100644 --- a/src/test/java/io/lettuce/core/commands/StreamCommandIntegrationTests.java +++ b/src/test/java/io/lettuce/core/commands/StreamCommandIntegrationTests.java @@ -320,6 +320,22 @@ void xgroupread() { assertThat(read1).hasSize(1); } + @Test + void xgroupreadDeletedMessage() { + + redis.xgroupCreate(StreamOffset.latest(key), "del-group", XGroupCreateArgs.Builder.mkstream()); + redis.xadd(key, Collections.singletonMap("key", "value1")); + redis.xreadgroup(Consumer.from("del-group", "consumer1"), StreamOffset.lastConsumed(key)); + + redis.xadd(key, XAddArgs.Builder.maxlen(1), Collections.singletonMap("key", "value2")); + + List> messages = redis.xreadgroup(Consumer.from("del-group", "consumer1"), + StreamOffset.from(key, "0-0")); + + assertThat(messages).hasSize(1); + assertThat(messages.get(0).getBody()).isEmpty(); + } + @Test void xpendingWithoutRead() {