Skip to content

Commit

Permalink
Add support for IDLE option in XPENDING
Browse files Browse the repository at this point in the history
  • Loading branch information
dengliming committed Mar 2, 2021
1 parent c3337f5 commit 9833d79
Show file tree
Hide file tree
Showing 13 changed files with 214 additions and 0 deletions.
5 changes: 5 additions & 0 deletions src/main/java/io/lettuce/core/AbstractRedisAsyncCommands.java
Original file line number Diff line number Diff line change
Expand Up @@ -1722,6 +1722,11 @@ public RedisFuture<List<PendingMessage>> xpending(K key, Consumer<K> consumer, R
return dispatch(commandBuilder.xpending(key, consumer, range, limit));
}

@Override
public RedisFuture<List<PendingMessage>> xpending(K key, XPendingArgs<K> args) {
return dispatch(commandBuilder.xpending(key, args));
}

@Override
public RedisFuture<List<StreamMessage<K, V>>> xrange(K key, Range<String> range) {
return dispatch(commandBuilder.xrange(key, range, Limit.unlimited()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1799,6 +1799,11 @@ public Flux<PendingMessage> xpending(K key, Consumer<K> consumer, Range<String>
return createDissolvingFlux(() -> commandBuilder.xpending(key, consumer, range, limit));
}

@Override
public Flux<PendingMessage> xpending(K key, XPendingArgs<K> args) {
return createDissolvingFlux(() -> commandBuilder.xpending(key, args));
}

@Override
public Flux<StreamMessage<K, V>> xrange(K key, Range<String> range) {
return createDissolvingFlux(() -> commandBuilder.xrange(key, range, Limit.unlimited()));
Expand Down
9 changes: 9 additions & 0 deletions src/main/java/io/lettuce/core/RedisCommandBuilder.java
Original file line number Diff line number Diff line change
Expand Up @@ -2469,6 +2469,15 @@ public Command<K, V, List<PendingMessage>> xpending(K key, Consumer<K> consumer,
return createCommand(XPENDING, new PendingMessageListOutput<>(codec), args);
}

public Command<K, V, List<PendingMessage>> xpending(K key, XPendingArgs<K> commandArgs) {
notNullKey(key);
LettuceAssert.notNull(commandArgs, "XPendingArgs " + MUST_NOT_BE_NULL);

CommandArgs<K, V> args = new CommandArgs<>(codec).addKey(key);
commandArgs.build(args);
return createCommand(XPENDING, new PendingMessageListOutput<>(codec), args);
}

public Command<K, V, List<StreamMessage<K, V>>> xrange(K key, Range<String> range, Limit limit) {
notNullKey(key);
LettuceAssert.notNull(range, "Range " + MUST_NOT_BE_NULL);
Expand Down
112 changes: 112 additions & 0 deletions src/main/java/io/lettuce/core/XPendingArgs.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
/*
* Copyright 2018-2021 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.lettuce.core;

import io.lettuce.core.internal.LettuceAssert;
import io.lettuce.core.protocol.CommandArgs;
import io.lettuce.core.protocol.CommandKeyword;

/**
* Argument list builder for the Redis <a href="http://redis.io/commands/xpending">XPENDING</a> command.
* Static import the methods from {@link XPendingArgs.Builder} and call the methods: {@code block(…)} .
* <p>
* {@link XPendingArgs} is a mutable object and instances should be used only once to avoid shared mutable state.
*
* @author dengliming
* @since 6.1
*/
public class XPendingArgs<K> {

private Consumer<K> consumer;

private Range<String> range;

private Limit limit;

private Long idle;

/**
* Builder entry points for {@link XPendingArgs}.
*/
public static class Builder {

/**
* Utility constructor.
*/
private Builder() {
}

/**
* Create a new {@link XPendingArgs} .
*
* @param consumer
* @param range
* @param limit
*/
public static <K> XPendingArgs xpending(Consumer<K> consumer, Range<String> range, Limit limit) {
return new XPendingArgs().consumer(consumer).range(range).limit(limit);
}
}

public XPendingArgs range(Range<String> range) {
LettuceAssert.notNull(range, "Range must not be null");

this.range = range;
return this;
}

public XPendingArgs consumer(Consumer<K> consumer) {
LettuceAssert.notNull(consumer, "Consumer must not be null");

this.consumer = consumer;
return this;
}

public XPendingArgs limit(Limit limit) {
LettuceAssert.notNull(limit, "Limit must not be null");

this.limit = limit;
return this;
}

public XPendingArgs idle(Long idle) {
this.idle = idle;
return this;
}

public <V> void build(CommandArgs<K, V> args) {
args.addKey(consumer.group);

if (idle != null) {
args.add(CommandKeyword.IDLE).add(idle);
}

if (range.getLower().equals(Range.Boundary.unbounded())) {
args.add("-");
} else {
args.add(range.getLower().getValue());
}

if (range.getUpper().equals(Range.Boundary.unbounded())) {
args.add("+");
} else {
args.add(range.getUpper().getValue());
}

args.add(limit.isLimited() ? limit.getCount() : Long.MAX_VALUE);
args.addKey(consumer.name);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,15 @@ public interface RedisStreamAsyncCommands<K, V> {
*/
RedisFuture<List<PendingMessage>> xpending(K key, Consumer<K> consumer, Range<String> range, Limit limit);

/**
* Read pending messages from a stream within a specific {@link XPendingArgs}.
*
* @param key the stream key.
* @param args
* @return List&lt;Object&gt; array-reply list with members of the resulting stream.
*/
RedisFuture<List<PendingMessage>> xpending(K key, XPendingArgs<K> args);

/**
* Read messages from a stream within a specific {@link Range}.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,15 @@ public interface RedisStreamReactiveCommands<K, V> {
*/
Flux<PendingMessage> xpending(K key, Consumer<K> consumer, Range<String> range, Limit limit);

/**
* Read pending messages from a stream within a specific {@link XPendingArgs}.
*
* @param key the stream key.
* @param args
* @return Object array-reply list with members of the resulting stream.
*/
Flux<PendingMessage> xpending(K key, XPendingArgs<K> args);

/**
* Read messages from a stream within a specific {@link Range}.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,15 @@ public interface RedisStreamCommands<K, V> {
*/
List<PendingMessage> xpending(K key, Consumer<K> consumer, Range<String> range, Limit limit);

/**
* Read pending messages from a stream within a specific {@link XPendingArgs}.
*
* @param key the stream key.
* @param args
* @return List&lt;Object&gt; array-reply list with members of the resulting stream.
*/
List<PendingMessage> xpending(K key, XPendingArgs<K> args);

/**
* Read messages from a stream within a specific {@link Range}.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,15 @@ public interface NodeSelectionStreamAsyncCommands<K, V> {
*/
AsyncExecutions<List<PendingMessage>> xpending(K key, Consumer<K> consumer, Range<String> range, Limit limit);

/**
* Read pending messages from a stream within a specific {@link XPendingArgs}.
*
* @param key the stream key.
* @param args
* @return List&lt;Object&gt; array-reply list with members of the resulting stream.
*/
AsyncExecutions<List<PendingMessage>> xpending(K key, XPendingArgs<K> args);

/**
* Read messages from a stream within a specific {@link Range}.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,15 @@ public interface NodeSelectionStreamCommands<K, V> {
*/
Executions<List<PendingMessage>> xpending(K key, Consumer<K> consumer, Range<String> range, Limit limit);

/**
* Read pending messages from a stream within a specific {@link XPendingArgs}.
*
* @param key the stream key.
* @param args
* @return List&lt;Object&gt; array-reply list with members of the resulting stream.
*/
Executions<List<PendingMessage>> xpending(K key, XPendingArgs<K> args);

/**
* Read messages from a stream within a specific {@link Range}.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,15 @@ interface RedisStreamCoroutinesCommands<K : Any, V : Any> {
*/
fun xpending(key: K, consumer: Consumer<K>, range: Range<String>, limit: Limit): Flow<PendingMessage>

/**
* Read pending messages from a stream within a specific [XPendingArgs].
*
* @param key the stream key.
* @param args
* @return List<Any> array-reply list with members of the resulting stream.
*/
fun xpending(key: K, args: XPendingArgs<K>): Flow<PendingMessage>

/**
* Read messages from a stream within a specific [Range].
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,8 @@ internal class RedisStreamCoroutinesCommandsImpl<K : Any, V : Any>(internal val

override fun xpending(key: K, consumer: Consumer<K>, range: Range<String>, limit: Limit): Flow<PendingMessage> = ops.xpending(key, consumer, range, limit).asFlow()

override fun xpending(key: K, args: XPendingArgs<K>): Flow<PendingMessage> = ops.xpending(key, args).asFlow()

override fun xrange(key: K, range: Range<String>): Flow<StreamMessage<K, V>> = ops.xrange(key, range).asFlow()

override fun xrange(key: K, range: Range<String>, limit: Limit): Flow<StreamMessage<K, V>> = ops.xrange(key, range, limit).asFlow()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,15 @@ public interface RedisStreamCommands<K, V> {
*/
List<PendingMessage> xpending(K key, Consumer<K> consumer, Range<String> range, Limit limit);

/**
* Read pending messages from a stream within a specific {@link XPendingArgs}.
*
* @param key the stream key.
* @param args
* @return List&lt;Object&gt; array-reply list with members of the resulting stream.
*/
List<PendingMessage> xpending(K key, XPendingArgs<K> args);

/**
* Read messages from a stream within a specific {@link Range}.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@

import javax.inject.Inject;

import io.lettuce.core.XPendingArgs;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInstance;
Expand Down Expand Up @@ -410,6 +411,23 @@ void xpending() {
assertThat(message.getRedeliveryCount()).isEqualTo(1);
}

@Test
void xpendingWithArgs() {

redis.xgroupCreate(StreamOffset.latest(key), "group", XGroupCreateArgs.Builder.mkstream());
String id = redis.xadd(key, Collections.singletonMap("key", "value"));

redis.xreadgroup(Consumer.from("group", "consumer1"), StreamOffset.lastConsumed(key));

List<PendingMessage> pendingEntries = redis.xpending(key, XPendingArgs.Builder
.xpending(Consumer.from("group", "consumer1"), Range.unbounded(), Limit.from(10)).idle(30L));

PendingMessage message = pendingEntries.get(0);
assertThat(message.getId()).isEqualTo(id);
assertThat(message.getConsumer()).isEqualTo("consumer1");
assertThat(message.getRedeliveryCount()).isEqualTo(1);
}

@Test
void xpendingWithoutMessages() {

Expand Down

0 comments on commit 9833d79

Please sign in to comment.