Skip to content

Commit

Permalink
Fix StreamReadOutput when XREADGROUP doesn't report the body #1474
Browse files Browse the repository at this point in the history
We now correctly decode stream messages that are deleted (i.e. don't return the body).
  • Loading branch information
mp911de committed Oct 26, 2020
1 parent aca896e commit 39cb373
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 2 deletions.
12 changes: 10 additions & 2 deletions src/main/java/io/lettuce/core/output/StreamReadOutput.java
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ public class StreamReadOutput<K, V> extends CommandOutput<K, V, List<StreamMessa

private Map<K, V> body;

private boolean bodyReceived = false;

public StreamReadOutput(RedisCodec<K, V> codec) {
super(codec, Collections.emptyList());
setSubscriber(ListSubscriber.instance());
Expand All @@ -70,6 +72,11 @@ public void set(ByteBuffer bytes) {
}

if (key == null) {
bodyReceived = true;
if (bytes == null) {
return;
}

key = codec.decodeKey(bytes);
return;
}
Expand All @@ -94,8 +101,9 @@ public void multi(int count) {
@Override
public void complete(int depth) {

if (depth == 3 && body != null) {
subscriber.onNext(output, new StreamMessage<>(stream, id, body));
if (depth == 3 && bodyReceived) {
subscriber.onNext(output, new StreamMessage<>(stream, id, body == null ? Collections.emptyMap() : body));
bodyReceived = false;
key = null;
body = null;
id = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -320,6 +320,22 @@ void xgroupread() {
assertThat(read1).hasSize(1);
}

@Test
void xgroupreadDeletedMessage() {

redis.xgroupCreate(StreamOffset.latest(key), "del-group", XGroupCreateArgs.Builder.mkstream());
redis.xadd(key, Collections.singletonMap("key", "value1"));
redis.xreadgroup(Consumer.from("del-group", "consumer1"), StreamOffset.lastConsumed(key));

redis.xadd(key, XAddArgs.Builder.maxlen(1), Collections.singletonMap("key", "value2"));

List<StreamMessage<String, String>> messages = redis.xreadgroup(Consumer.from("del-group", "consumer1"),
StreamOffset.from(key, "0-0"));

assertThat(messages).hasSize(1);
assertThat(messages.get(0).getBody()).isEmpty();
}

@Test
void xpendingWithoutRead() {

Expand Down

0 comments on commit 39cb373

Please sign in to comment.