From 9833d7984ccef99dc248c0ab71fe6ff399de052a Mon Sep 17 00:00:00 2001 From: dengliming Date: Wed, 3 Mar 2021 00:16:09 +0800 Subject: [PATCH] Add support for IDLE option in XPENDING --- .../core/AbstractRedisAsyncCommands.java | 5 + .../core/AbstractRedisReactiveCommands.java | 5 + .../io/lettuce/core/RedisCommandBuilder.java | 9 ++ .../java/io/lettuce/core/XPendingArgs.java | 112 ++++++++++++++++++ .../api/async/RedisStreamAsyncCommands.java | 9 ++ .../reactive/RedisStreamReactiveCommands.java | 9 ++ .../core/api/sync/RedisStreamCommands.java | 9 ++ .../NodeSelectionStreamAsyncCommands.java | 9 ++ .../api/sync/NodeSelectionStreamCommands.java | 9 ++ .../RedisStreamCoroutinesCommands.kt | 9 ++ .../RedisStreamCoroutinesCommandsImpl.kt | 2 + .../lettuce/core/api/RedisStreamCommands.java | 9 ++ .../StreamCommandIntegrationTests.java | 18 +++ 13 files changed, 214 insertions(+) create mode 100644 src/main/java/io/lettuce/core/XPendingArgs.java diff --git a/src/main/java/io/lettuce/core/AbstractRedisAsyncCommands.java b/src/main/java/io/lettuce/core/AbstractRedisAsyncCommands.java index ffd0895ee1..6e45ed2447 100644 --- a/src/main/java/io/lettuce/core/AbstractRedisAsyncCommands.java +++ b/src/main/java/io/lettuce/core/AbstractRedisAsyncCommands.java @@ -1722,6 +1722,11 @@ public RedisFuture> xpending(K key, Consumer consumer, R return dispatch(commandBuilder.xpending(key, consumer, range, limit)); } + @Override + public RedisFuture> xpending(K key, XPendingArgs args) { + return dispatch(commandBuilder.xpending(key, args)); + } + @Override public RedisFuture>> xrange(K key, Range range) { return dispatch(commandBuilder.xrange(key, range, Limit.unlimited())); diff --git a/src/main/java/io/lettuce/core/AbstractRedisReactiveCommands.java b/src/main/java/io/lettuce/core/AbstractRedisReactiveCommands.java index f62bafad56..7507ed2e40 100644 --- a/src/main/java/io/lettuce/core/AbstractRedisReactiveCommands.java +++ b/src/main/java/io/lettuce/core/AbstractRedisReactiveCommands.java @@ -1799,6 +1799,11 @@ public Flux xpending(K key, Consumer consumer, Range return createDissolvingFlux(() -> commandBuilder.xpending(key, consumer, range, limit)); } + @Override + public Flux xpending(K key, XPendingArgs args) { + return createDissolvingFlux(() -> commandBuilder.xpending(key, args)); + } + @Override public Flux> xrange(K key, Range range) { return createDissolvingFlux(() -> commandBuilder.xrange(key, range, Limit.unlimited())); diff --git a/src/main/java/io/lettuce/core/RedisCommandBuilder.java b/src/main/java/io/lettuce/core/RedisCommandBuilder.java index 9470d17bfa..1030c22c14 100644 --- a/src/main/java/io/lettuce/core/RedisCommandBuilder.java +++ b/src/main/java/io/lettuce/core/RedisCommandBuilder.java @@ -2469,6 +2469,15 @@ public Command> xpending(K key, Consumer consumer, return createCommand(XPENDING, new PendingMessageListOutput<>(codec), args); } + public Command> xpending(K key, XPendingArgs commandArgs) { + notNullKey(key); + LettuceAssert.notNull(commandArgs, "XPendingArgs " + MUST_NOT_BE_NULL); + + CommandArgs args = new CommandArgs<>(codec).addKey(key); + commandArgs.build(args); + return createCommand(XPENDING, new PendingMessageListOutput<>(codec), args); + } + public Command>> xrange(K key, Range range, Limit limit) { notNullKey(key); LettuceAssert.notNull(range, "Range " + MUST_NOT_BE_NULL); diff --git a/src/main/java/io/lettuce/core/XPendingArgs.java b/src/main/java/io/lettuce/core/XPendingArgs.java new file mode 100644 index 0000000000..d5053b642b --- /dev/null +++ b/src/main/java/io/lettuce/core/XPendingArgs.java @@ -0,0 +1,112 @@ +/* + * Copyright 2018-2021 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.lettuce.core; + +import io.lettuce.core.internal.LettuceAssert; +import io.lettuce.core.protocol.CommandArgs; +import io.lettuce.core.protocol.CommandKeyword; + +/** + * Argument list builder for the Redis XPENDING command. + * Static import the methods from {@link XPendingArgs.Builder} and call the methods: {@code block(…)} . + *

+ * {@link XPendingArgs} is a mutable object and instances should be used only once to avoid shared mutable state. + * + * @author dengliming + * @since 6.1 + */ +public class XPendingArgs { + + private Consumer consumer; + + private Range range; + + private Limit limit; + + private Long idle; + + /** + * Builder entry points for {@link XPendingArgs}. + */ + public static class Builder { + + /** + * Utility constructor. + */ + private Builder() { + } + + /** + * Create a new {@link XPendingArgs} . + * + * @param consumer + * @param range + * @param limit + */ + public static XPendingArgs xpending(Consumer consumer, Range range, Limit limit) { + return new XPendingArgs().consumer(consumer).range(range).limit(limit); + } + } + + public XPendingArgs range(Range range) { + LettuceAssert.notNull(range, "Range must not be null"); + + this.range = range; + return this; + } + + public XPendingArgs consumer(Consumer consumer) { + LettuceAssert.notNull(consumer, "Consumer must not be null"); + + this.consumer = consumer; + return this; + } + + public XPendingArgs limit(Limit limit) { + LettuceAssert.notNull(limit, "Limit must not be null"); + + this.limit = limit; + return this; + } + + public XPendingArgs idle(Long idle) { + this.idle = idle; + return this; + } + + public void build(CommandArgs args) { + args.addKey(consumer.group); + + if (idle != null) { + args.add(CommandKeyword.IDLE).add(idle); + } + + if (range.getLower().equals(Range.Boundary.unbounded())) { + args.add("-"); + } else { + args.add(range.getLower().getValue()); + } + + if (range.getUpper().equals(Range.Boundary.unbounded())) { + args.add("+"); + } else { + args.add(range.getUpper().getValue()); + } + + args.add(limit.isLimited() ? limit.getCount() : Long.MAX_VALUE); + args.addKey(consumer.name); + } +} diff --git a/src/main/java/io/lettuce/core/api/async/RedisStreamAsyncCommands.java b/src/main/java/io/lettuce/core/api/async/RedisStreamAsyncCommands.java index 106d471951..66479aef8b 100644 --- a/src/main/java/io/lettuce/core/api/async/RedisStreamAsyncCommands.java +++ b/src/main/java/io/lettuce/core/api/async/RedisStreamAsyncCommands.java @@ -241,6 +241,15 @@ public interface RedisStreamAsyncCommands { */ RedisFuture> xpending(K key, Consumer consumer, Range range, Limit limit); + /** + * Read pending messages from a stream within a specific {@link XPendingArgs}. + * + * @param key the stream key. + * @param args + * @return List<Object> array-reply list with members of the resulting stream. + */ + RedisFuture> xpending(K key, XPendingArgs args); + /** * Read messages from a stream within a specific {@link Range}. * diff --git a/src/main/java/io/lettuce/core/api/reactive/RedisStreamReactiveCommands.java b/src/main/java/io/lettuce/core/api/reactive/RedisStreamReactiveCommands.java index 29f6e1525f..24b2c1bd48 100644 --- a/src/main/java/io/lettuce/core/api/reactive/RedisStreamReactiveCommands.java +++ b/src/main/java/io/lettuce/core/api/reactive/RedisStreamReactiveCommands.java @@ -242,6 +242,15 @@ public interface RedisStreamReactiveCommands { */ Flux xpending(K key, Consumer consumer, Range range, Limit limit); + /** + * Read pending messages from a stream within a specific {@link XPendingArgs}. + * + * @param key the stream key. + * @param args + * @return Object array-reply list with members of the resulting stream. + */ + Flux xpending(K key, XPendingArgs args); + /** * Read messages from a stream within a specific {@link Range}. * diff --git a/src/main/java/io/lettuce/core/api/sync/RedisStreamCommands.java b/src/main/java/io/lettuce/core/api/sync/RedisStreamCommands.java index fe002f782c..8bcdf2d95d 100644 --- a/src/main/java/io/lettuce/core/api/sync/RedisStreamCommands.java +++ b/src/main/java/io/lettuce/core/api/sync/RedisStreamCommands.java @@ -241,6 +241,15 @@ public interface RedisStreamCommands { */ List xpending(K key, Consumer consumer, Range range, Limit limit); + /** + * Read pending messages from a stream within a specific {@link XPendingArgs}. + * + * @param key the stream key. + * @param args + * @return List<Object> array-reply list with members of the resulting stream. + */ + List xpending(K key, XPendingArgs args); + /** * Read messages from a stream within a specific {@link Range}. * diff --git a/src/main/java/io/lettuce/core/cluster/api/async/NodeSelectionStreamAsyncCommands.java b/src/main/java/io/lettuce/core/cluster/api/async/NodeSelectionStreamAsyncCommands.java index e57189a0ee..e597938c51 100644 --- a/src/main/java/io/lettuce/core/cluster/api/async/NodeSelectionStreamAsyncCommands.java +++ b/src/main/java/io/lettuce/core/cluster/api/async/NodeSelectionStreamAsyncCommands.java @@ -241,6 +241,15 @@ public interface NodeSelectionStreamAsyncCommands { */ AsyncExecutions> xpending(K key, Consumer consumer, Range range, Limit limit); + /** + * Read pending messages from a stream within a specific {@link XPendingArgs}. + * + * @param key the stream key. + * @param args + * @return List<Object> array-reply list with members of the resulting stream. + */ + AsyncExecutions> xpending(K key, XPendingArgs args); + /** * Read messages from a stream within a specific {@link Range}. * diff --git a/src/main/java/io/lettuce/core/cluster/api/sync/NodeSelectionStreamCommands.java b/src/main/java/io/lettuce/core/cluster/api/sync/NodeSelectionStreamCommands.java index e52cb57fb9..75be17ff42 100644 --- a/src/main/java/io/lettuce/core/cluster/api/sync/NodeSelectionStreamCommands.java +++ b/src/main/java/io/lettuce/core/cluster/api/sync/NodeSelectionStreamCommands.java @@ -241,6 +241,15 @@ public interface NodeSelectionStreamCommands { */ Executions> xpending(K key, Consumer consumer, Range range, Limit limit); + /** + * Read pending messages from a stream within a specific {@link XPendingArgs}. + * + * @param key the stream key. + * @param args + * @return List<Object> array-reply list with members of the resulting stream. + */ + Executions> xpending(K key, XPendingArgs args); + /** * Read messages from a stream within a specific {@link Range}. * diff --git a/src/main/kotlin/io/lettuce/core/api/coroutines/RedisStreamCoroutinesCommands.kt b/src/main/kotlin/io/lettuce/core/api/coroutines/RedisStreamCoroutinesCommands.kt index 33d94c95bf..05f56a5482 100644 --- a/src/main/kotlin/io/lettuce/core/api/coroutines/RedisStreamCoroutinesCommands.kt +++ b/src/main/kotlin/io/lettuce/core/api/coroutines/RedisStreamCoroutinesCommands.kt @@ -241,6 +241,15 @@ interface RedisStreamCoroutinesCommands { */ fun xpending(key: K, consumer: Consumer, range: Range, limit: Limit): Flow + /** + * Read pending messages from a stream within a specific [XPendingArgs]. + * + * @param key the stream key. + * @param args + * @return List array-reply list with members of the resulting stream. + */ + fun xpending(key: K, args: XPendingArgs): Flow + /** * Read messages from a stream within a specific [Range]. * diff --git a/src/main/kotlin/io/lettuce/core/api/coroutines/RedisStreamCoroutinesCommandsImpl.kt b/src/main/kotlin/io/lettuce/core/api/coroutines/RedisStreamCoroutinesCommandsImpl.kt index 05d18c3246..0a8f041e46 100644 --- a/src/main/kotlin/io/lettuce/core/api/coroutines/RedisStreamCoroutinesCommandsImpl.kt +++ b/src/main/kotlin/io/lettuce/core/api/coroutines/RedisStreamCoroutinesCommandsImpl.kt @@ -79,6 +79,8 @@ internal class RedisStreamCoroutinesCommandsImpl(internal val override fun xpending(key: K, consumer: Consumer, range: Range, limit: Limit): Flow = ops.xpending(key, consumer, range, limit).asFlow() + override fun xpending(key: K, args: XPendingArgs): Flow = ops.xpending(key, args).asFlow() + override fun xrange(key: K, range: Range): Flow> = ops.xrange(key, range).asFlow() override fun xrange(key: K, range: Range, limit: Limit): Flow> = ops.xrange(key, range, limit).asFlow() diff --git a/src/main/templates/io/lettuce/core/api/RedisStreamCommands.java b/src/main/templates/io/lettuce/core/api/RedisStreamCommands.java index 2432c816cd..6e63789d39 100644 --- a/src/main/templates/io/lettuce/core/api/RedisStreamCommands.java +++ b/src/main/templates/io/lettuce/core/api/RedisStreamCommands.java @@ -247,6 +247,15 @@ public interface RedisStreamCommands { */ List xpending(K key, Consumer consumer, Range range, Limit limit); + /** + * Read pending messages from a stream within a specific {@link XPendingArgs}. + * + * @param key the stream key. + * @param args + * @return List<Object> array-reply list with members of the resulting stream. + */ + List xpending(K key, XPendingArgs args); + /** * Read messages from a stream within a specific {@link Range}. * diff --git a/src/test/java/io/lettuce/core/commands/StreamCommandIntegrationTests.java b/src/test/java/io/lettuce/core/commands/StreamCommandIntegrationTests.java index a1f8d7e549..b933b519a7 100644 --- a/src/test/java/io/lettuce/core/commands/StreamCommandIntegrationTests.java +++ b/src/test/java/io/lettuce/core/commands/StreamCommandIntegrationTests.java @@ -28,6 +28,7 @@ import javax.inject.Inject; +import io.lettuce.core.XPendingArgs; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.TestInstance; @@ -410,6 +411,23 @@ void xpending() { assertThat(message.getRedeliveryCount()).isEqualTo(1); } + @Test + void xpendingWithArgs() { + + redis.xgroupCreate(StreamOffset.latest(key), "group", XGroupCreateArgs.Builder.mkstream()); + String id = redis.xadd(key, Collections.singletonMap("key", "value")); + + redis.xreadgroup(Consumer.from("group", "consumer1"), StreamOffset.lastConsumed(key)); + + List pendingEntries = redis.xpending(key, XPendingArgs.Builder + .xpending(Consumer.from("group", "consumer1"), Range.unbounded(), Limit.from(10)).idle(30L)); + + PendingMessage message = pendingEntries.get(0); + assertThat(message.getId()).isEqualTo(id); + assertThat(message.getConsumer()).isEqualTo("consumer1"); + assertThat(message.getRedeliveryCount()).isEqualTo(1); + } + @Test void xpendingWithoutMessages() {