diff --git a/src/main/java/io/lettuce/core/AbstractRedisAsyncCommands.java b/src/main/java/io/lettuce/core/AbstractRedisAsyncCommands.java index 46834fc260..d245520ba1 100644 --- a/src/main/java/io/lettuce/core/AbstractRedisAsyncCommands.java +++ b/src/main/java/io/lettuce/core/AbstractRedisAsyncCommands.java @@ -1842,6 +1842,11 @@ public RedisFuture> xpending(K key, Consumer consumer, R return dispatch(commandBuilder.xpending(key, consumer, range, limit)); } + @Override + public RedisFuture> xpending(K key, XPendingArgs args) { + return dispatch(commandBuilder.xpending(key, args)); + } + @Override public RedisFuture>> xrange(K key, Range range) { return dispatch(commandBuilder.xrange(key, range, Limit.unlimited())); diff --git a/src/main/java/io/lettuce/core/AbstractRedisReactiveCommands.java b/src/main/java/io/lettuce/core/AbstractRedisReactiveCommands.java index d338e50331..37713bf93f 100644 --- a/src/main/java/io/lettuce/core/AbstractRedisReactiveCommands.java +++ b/src/main/java/io/lettuce/core/AbstractRedisReactiveCommands.java @@ -1917,6 +1917,11 @@ public Flux xpending(K key, Consumer consumer, Range return createDissolvingFlux(() -> commandBuilder.xpending(key, consumer, range, limit)); } + @Override + public Flux xpending(K key, XPendingArgs args) { + return createDissolvingFlux(() -> commandBuilder.xpending(key, args)); + } + @Override public Flux> xrange(K key, Range range) { return createDissolvingFlux(() -> commandBuilder.xrange(key, range, Limit.unlimited())); diff --git a/src/main/java/io/lettuce/core/RedisCommandBuilder.java b/src/main/java/io/lettuce/core/RedisCommandBuilder.java index d159d4da48..ef360da897 100644 --- a/src/main/java/io/lettuce/core/RedisCommandBuilder.java +++ b/src/main/java/io/lettuce/core/RedisCommandBuilder.java @@ -2619,6 +2619,15 @@ public Command> xpending(K key, Consumer consumer, return createCommand(XPENDING, new PendingMessageListOutput<>(codec), args); } + public Command> xpending(K key, XPendingArgs commandArgs) { + notNullKey(key); + LettuceAssert.notNull(commandArgs, "XPendingArgs " + MUST_NOT_BE_NULL); + + CommandArgs args = new CommandArgs<>(codec).addKey(key); + commandArgs.build(args); + return createCommand(XPENDING, new PendingMessageListOutput<>(codec), args); + } + public Command>> xrange(K key, Range range, Limit limit) { notNullKey(key); LettuceAssert.notNull(range, "Range " + MUST_NOT_BE_NULL); diff --git a/src/main/java/io/lettuce/core/XPendingArgs.java b/src/main/java/io/lettuce/core/XPendingArgs.java new file mode 100644 index 0000000000..d5053b642b --- /dev/null +++ b/src/main/java/io/lettuce/core/XPendingArgs.java @@ -0,0 +1,112 @@ +/* + * Copyright 2018-2021 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.lettuce.core; + +import io.lettuce.core.internal.LettuceAssert; +import io.lettuce.core.protocol.CommandArgs; +import io.lettuce.core.protocol.CommandKeyword; + +/** + * Argument list builder for the Redis XPENDING command. + * Static import the methods from {@link XPendingArgs.Builder} and call the methods: {@code block(…)} . + *

+ * {@link XPendingArgs} is a mutable object and instances should be used only once to avoid shared mutable state. + * + * @author dengliming + * @since 6.1 + */ +public class XPendingArgs { + + private Consumer consumer; + + private Range range; + + private Limit limit; + + private Long idle; + + /** + * Builder entry points for {@link XPendingArgs}. + */ + public static class Builder { + + /** + * Utility constructor. + */ + private Builder() { + } + + /** + * Create a new {@link XPendingArgs} . + * + * @param consumer + * @param range + * @param limit + */ + public static XPendingArgs xpending(Consumer consumer, Range range, Limit limit) { + return new XPendingArgs().consumer(consumer).range(range).limit(limit); + } + } + + public XPendingArgs range(Range range) { + LettuceAssert.notNull(range, "Range must not be null"); + + this.range = range; + return this; + } + + public XPendingArgs consumer(Consumer consumer) { + LettuceAssert.notNull(consumer, "Consumer must not be null"); + + this.consumer = consumer; + return this; + } + + public XPendingArgs limit(Limit limit) { + LettuceAssert.notNull(limit, "Limit must not be null"); + + this.limit = limit; + return this; + } + + public XPendingArgs idle(Long idle) { + this.idle = idle; + return this; + } + + public void build(CommandArgs args) { + args.addKey(consumer.group); + + if (idle != null) { + args.add(CommandKeyword.IDLE).add(idle); + } + + if (range.getLower().equals(Range.Boundary.unbounded())) { + args.add("-"); + } else { + args.add(range.getLower().getValue()); + } + + if (range.getUpper().equals(Range.Boundary.unbounded())) { + args.add("+"); + } else { + args.add(range.getUpper().getValue()); + } + + args.add(limit.isLimited() ? limit.getCount() : Long.MAX_VALUE); + args.addKey(consumer.name); + } +} diff --git a/src/main/java/io/lettuce/core/api/async/RedisStreamAsyncCommands.java b/src/main/java/io/lettuce/core/api/async/RedisStreamAsyncCommands.java index 106d471951..66479aef8b 100644 --- a/src/main/java/io/lettuce/core/api/async/RedisStreamAsyncCommands.java +++ b/src/main/java/io/lettuce/core/api/async/RedisStreamAsyncCommands.java @@ -241,6 +241,15 @@ public interface RedisStreamAsyncCommands { */ RedisFuture> xpending(K key, Consumer consumer, Range range, Limit limit); + /** + * Read pending messages from a stream within a specific {@link XPendingArgs}. + * + * @param key the stream key. + * @param args + * @return List<Object> array-reply list with members of the resulting stream. + */ + RedisFuture> xpending(K key, XPendingArgs args); + /** * Read messages from a stream within a specific {@link Range}. * diff --git a/src/main/java/io/lettuce/core/api/reactive/RedisStreamReactiveCommands.java b/src/main/java/io/lettuce/core/api/reactive/RedisStreamReactiveCommands.java index 29f6e1525f..24b2c1bd48 100644 --- a/src/main/java/io/lettuce/core/api/reactive/RedisStreamReactiveCommands.java +++ b/src/main/java/io/lettuce/core/api/reactive/RedisStreamReactiveCommands.java @@ -242,6 +242,15 @@ public interface RedisStreamReactiveCommands { */ Flux xpending(K key, Consumer consumer, Range range, Limit limit); + /** + * Read pending messages from a stream within a specific {@link XPendingArgs}. + * + * @param key the stream key. + * @param args + * @return Object array-reply list with members of the resulting stream. + */ + Flux xpending(K key, XPendingArgs args); + /** * Read messages from a stream within a specific {@link Range}. * diff --git a/src/main/java/io/lettuce/core/api/sync/RedisStreamCommands.java b/src/main/java/io/lettuce/core/api/sync/RedisStreamCommands.java index fe002f782c..8bcdf2d95d 100644 --- a/src/main/java/io/lettuce/core/api/sync/RedisStreamCommands.java +++ b/src/main/java/io/lettuce/core/api/sync/RedisStreamCommands.java @@ -241,6 +241,15 @@ public interface RedisStreamCommands { */ List xpending(K key, Consumer consumer, Range range, Limit limit); + /** + * Read pending messages from a stream within a specific {@link XPendingArgs}. + * + * @param key the stream key. + * @param args + * @return List<Object> array-reply list with members of the resulting stream. + */ + List xpending(K key, XPendingArgs args); + /** * Read messages from a stream within a specific {@link Range}. * diff --git a/src/main/java/io/lettuce/core/cluster/api/async/NodeSelectionStreamAsyncCommands.java b/src/main/java/io/lettuce/core/cluster/api/async/NodeSelectionStreamAsyncCommands.java index e57189a0ee..e597938c51 100644 --- a/src/main/java/io/lettuce/core/cluster/api/async/NodeSelectionStreamAsyncCommands.java +++ b/src/main/java/io/lettuce/core/cluster/api/async/NodeSelectionStreamAsyncCommands.java @@ -241,6 +241,15 @@ public interface NodeSelectionStreamAsyncCommands { */ AsyncExecutions> xpending(K key, Consumer consumer, Range range, Limit limit); + /** + * Read pending messages from a stream within a specific {@link XPendingArgs}. + * + * @param key the stream key. + * @param args + * @return List<Object> array-reply list with members of the resulting stream. + */ + AsyncExecutions> xpending(K key, XPendingArgs args); + /** * Read messages from a stream within a specific {@link Range}. * diff --git a/src/main/java/io/lettuce/core/cluster/api/sync/NodeSelectionStreamCommands.java b/src/main/java/io/lettuce/core/cluster/api/sync/NodeSelectionStreamCommands.java index e52cb57fb9..75be17ff42 100644 --- a/src/main/java/io/lettuce/core/cluster/api/sync/NodeSelectionStreamCommands.java +++ b/src/main/java/io/lettuce/core/cluster/api/sync/NodeSelectionStreamCommands.java @@ -241,6 +241,15 @@ public interface NodeSelectionStreamCommands { */ Executions> xpending(K key, Consumer consumer, Range range, Limit limit); + /** + * Read pending messages from a stream within a specific {@link XPendingArgs}. + * + * @param key the stream key. + * @param args + * @return List<Object> array-reply list with members of the resulting stream. + */ + Executions> xpending(K key, XPendingArgs args); + /** * Read messages from a stream within a specific {@link Range}. * diff --git a/src/main/kotlin/io/lettuce/core/api/coroutines/RedisStreamCoroutinesCommands.kt b/src/main/kotlin/io/lettuce/core/api/coroutines/RedisStreamCoroutinesCommands.kt index 33d94c95bf..05f56a5482 100644 --- a/src/main/kotlin/io/lettuce/core/api/coroutines/RedisStreamCoroutinesCommands.kt +++ b/src/main/kotlin/io/lettuce/core/api/coroutines/RedisStreamCoroutinesCommands.kt @@ -241,6 +241,15 @@ interface RedisStreamCoroutinesCommands { */ fun xpending(key: K, consumer: Consumer, range: Range, limit: Limit): Flow + /** + * Read pending messages from a stream within a specific [XPendingArgs]. + * + * @param key the stream key. + * @param args + * @return List array-reply list with members of the resulting stream. + */ + fun xpending(key: K, args: XPendingArgs): Flow + /** * Read messages from a stream within a specific [Range]. * diff --git a/src/main/kotlin/io/lettuce/core/api/coroutines/RedisStreamCoroutinesCommandsImpl.kt b/src/main/kotlin/io/lettuce/core/api/coroutines/RedisStreamCoroutinesCommandsImpl.kt index 05d18c3246..0a8f041e46 100644 --- a/src/main/kotlin/io/lettuce/core/api/coroutines/RedisStreamCoroutinesCommandsImpl.kt +++ b/src/main/kotlin/io/lettuce/core/api/coroutines/RedisStreamCoroutinesCommandsImpl.kt @@ -79,6 +79,8 @@ internal class RedisStreamCoroutinesCommandsImpl(internal val override fun xpending(key: K, consumer: Consumer, range: Range, limit: Limit): Flow = ops.xpending(key, consumer, range, limit).asFlow() + override fun xpending(key: K, args: XPendingArgs): Flow = ops.xpending(key, args).asFlow() + override fun xrange(key: K, range: Range): Flow> = ops.xrange(key, range).asFlow() override fun xrange(key: K, range: Range, limit: Limit): Flow> = ops.xrange(key, range, limit).asFlow() diff --git a/src/main/templates/io/lettuce/core/api/RedisStreamCommands.java b/src/main/templates/io/lettuce/core/api/RedisStreamCommands.java index 2432c816cd..6e63789d39 100644 --- a/src/main/templates/io/lettuce/core/api/RedisStreamCommands.java +++ b/src/main/templates/io/lettuce/core/api/RedisStreamCommands.java @@ -247,6 +247,15 @@ public interface RedisStreamCommands { */ List xpending(K key, Consumer consumer, Range range, Limit limit); + /** + * Read pending messages from a stream within a specific {@link XPendingArgs}. + * + * @param key the stream key. + * @param args + * @return List<Object> array-reply list with members of the resulting stream. + */ + List xpending(K key, XPendingArgs args); + /** * Read messages from a stream within a specific {@link Range}. * diff --git a/src/test/java/io/lettuce/core/commands/StreamCommandIntegrationTests.java b/src/test/java/io/lettuce/core/commands/StreamCommandIntegrationTests.java index dfc97fa8cb..198f4c3e07 100644 --- a/src/test/java/io/lettuce/core/commands/StreamCommandIntegrationTests.java +++ b/src/test/java/io/lettuce/core/commands/StreamCommandIntegrationTests.java @@ -28,6 +28,7 @@ import javax.inject.Inject; +import io.lettuce.core.XPendingArgs; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.TestInstance; @@ -424,6 +425,24 @@ void xpending() { assertThat(message.getRedeliveryCount()).isEqualTo(1); } + @Test + @EnabledOnCommand("XAUTOCLAIM") // Redis 6.2 + void xpendingWithArgs() { + + redis.xgroupCreate(StreamOffset.latest(key), "group", XGroupCreateArgs.Builder.mkstream()); + String id = redis.xadd(key, Collections.singletonMap("key", "value")); + + redis.xreadgroup(Consumer.from("group", "consumer1"), StreamOffset.lastConsumed(key)); + + List pendingEntries = redis.xpending(key, XPendingArgs.Builder + .xpending(Consumer.from("group", "consumer1"), Range.unbounded(), Limit.from(10)).idle(30L)); + + PendingMessage message = pendingEntries.get(0); + assertThat(message.getId()).isEqualTo(id); + assertThat(message.getConsumer()).isEqualTo("consumer1"); + assertThat(message.getRedeliveryCount()).isEqualTo(1); + } + @Test @EnabledOnCommand("XAUTOCLAIM") // Redis 6.2 void xpendingRanges() {