diff --git a/src/main/java/io/lettuce/core/output/StreamReadOutput.java b/src/main/java/io/lettuce/core/output/StreamReadOutput.java index 587d2a59b0..f4f6e9d26c 100644 --- a/src/main/java/io/lettuce/core/output/StreamReadOutput.java +++ b/src/main/java/io/lettuce/core/output/StreamReadOutput.java @@ -44,6 +44,8 @@ public class StreamReadOutput extends CommandOutput body; + private boolean bodyReceived = false; + public StreamReadOutput(RedisCodec codec) { super(codec, Collections.emptyList()); setSubscriber(ListSubscriber.instance()); @@ -63,6 +65,8 @@ public void set(ByteBuffer bytes) { } if (key == null) { + bodyReceived = true; + if (bytes == null) { return; } @@ -82,6 +86,10 @@ public void set(ByteBuffer bytes) { @Override public void multi(int count) { + if (id != null && key == null && count == -1) { + bodyReceived = true; + } + if (!initialized) { output = OutputFactory.newList(count); initialized = true; diff --git a/src/test/java/io/lettuce/core/commands/StreamCommandIntegrationTests.java b/src/test/java/io/lettuce/core/commands/StreamCommandIntegrationTests.java index d3495fe769..bc55a8c2e0 100644 --- a/src/test/java/io/lettuce/core/commands/StreamCommandIntegrationTests.java +++ b/src/test/java/io/lettuce/core/commands/StreamCommandIntegrationTests.java @@ -15,11 +15,15 @@ */ package io.lettuce.core.commands; -import static io.lettuce.core.protocol.CommandType.XINFO; -import static org.assertj.core.api.Assertions.assertThat; +import static io.lettuce.core.protocol.CommandType.*; +import static org.assertj.core.api.Assertions.*; import java.time.Instant; -import java.util.*; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; import javax.inject.Inject; @@ -28,12 +32,20 @@ import org.junit.jupiter.api.TestInstance; import org.junit.jupiter.api.extension.ExtendWith; -import io.lettuce.core.*; +import io.lettuce.core.Consumer; +import io.lettuce.core.Limit; +import io.lettuce.core.Range; +import io.lettuce.core.StreamMessage; +import io.lettuce.core.TestSupport; +import io.lettuce.core.TransactionResult; +import io.lettuce.core.XAddArgs; +import io.lettuce.core.XClaimArgs; +import io.lettuce.core.XGroupCreateArgs; +import io.lettuce.core.XReadArgs; import io.lettuce.core.XReadArgs.StreamOffset; import io.lettuce.core.api.sync.RedisCommands; import io.lettuce.core.codec.StringCodec; import io.lettuce.core.models.stream.PendingMessage; -import io.lettuce.core.models.stream.PendingMessages; import io.lettuce.core.models.stream.PendingParser; import io.lettuce.core.output.NestedMultiOutput; import io.lettuce.core.protocol.CommandArgs; @@ -310,6 +322,24 @@ void xgroupreadDeletedMessage() { assertThat(messages.get(0).getBody()).isEmpty(); } + @Test + void xgroupreadTrimmedMessage() { + + for (int i = 0; i < 10; i++) { + redis.xadd(key, Collections.singletonMap("key", "value1")); + } + + redis.xgroupCreate(StreamOffset.from(key, "0-0"), "del-group", XGroupCreateArgs.Builder.mkstream()); + + redis.xreadgroup(Consumer.from("del-group", "consumer1"), XReadArgs.Builder.count(10), StreamOffset.lastConsumed(key)); + redis.xtrim(key, 1); + + List> messages = redis.xreadgroup(Consumer.from("del-group", "consumer1"), + XReadArgs.Builder.count(10), StreamOffset.from(key, "0-0")); + + assertThat(messages).hasSize(10); + } + @Test void xpendingWithGroup() {