Skip to content

Commit

Permalink
Support JUSTID flag of XCLAIM command #1233
Browse files Browse the repository at this point in the history
We now support XClaimArgs.justid() that requests just the message id without returning the body.
  • Loading branch information
mp911de committed Feb 21, 2020
1 parent 905fac4 commit 02330b2
Show file tree
Hide file tree
Showing 9 changed files with 81 additions and 5 deletions.
3 changes: 3 additions & 0 deletions src/main/java/io/lettuce/core/StreamMessage.java
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,9 @@ public String getId() {
return id;
}

/**
* @return the message body. Can be {@literal null} for commands that do not return the message body.
*/
public Map<K, V> getBody() {
return body;
}
Expand Down
30 changes: 30 additions & 0 deletions src/main/java/io/lettuce/core/XClaimArgs.java
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ public class XClaimArgs {
private Long time;
private Long retrycount;
private boolean force;
private boolean justid;

/**
* Builder entry points for {@link XAddArgs}.
Expand All @@ -52,6 +53,18 @@ public static class Builder {
private Builder() {
}

/**
* Creates new {@link XClaimArgs} and set the {@code JUSTID} flag to return just the message id and do not increment the
* retry counter. The message body is not returned when calling {@code XCLAIM}.
*
* @return new {@link XClaimArgs} with min idle time set.
* @see XClaimArgs#justid()
* @since 5.3
*/
public static XClaimArgs justid() {
return new XClaimArgs().justid();
}

public static XClaimArgs minIdleTime(long milliseconds) {
return new XClaimArgs().minIdleTime(milliseconds);
}
Expand All @@ -70,6 +83,19 @@ public static XClaimArgs minIdleTime(Duration minIdleTime) {
}
}

/**
* Set the {@code JUSTID} flag to return just the message id and do not increment the retry counter. The message body is not
* returned when calling {@code XCLAIM}.
*
* @return {@code this}.
* @since 5.3
*/
public XClaimArgs justid() {

this.justid = true;
return this;
}

/**
* Return only messages that are idle for at least {@code milliseconds}.
*
Expand Down Expand Up @@ -206,5 +232,9 @@ public <K, V> void build(CommandArgs<K, V> args) {
if (force) {
args.add(CommandKeyword.FORCE);
}

if (justid) {
args.add(CommandKeyword.JUSTID);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,9 @@ public interface RedisStreamAsyncCommands<K, V> {

/**
* Gets ownership of one or multiple messages in the Pending Entries List of a given stream consumer group.
* <p>
* Note that setting the {@code JUSTID} flag (calling this method with {@link XClaimArgs#justid()}) suppresses the message
* bode and {@link StreamMessage#getBody()} is {@code null}.
*
* @param key the stream key.
* @param consumer consumer identified by group name and consumer key.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,9 @@ public interface RedisStreamReactiveCommands<K, V> {

/**
* Gets ownership of one or multiple messages in the Pending Entries List of a given stream consumer group.
* <p>
* Note that setting the {@code JUSTID} flag (calling this method with {@link XClaimArgs#justid()}) suppresses the message
* bode and {@link StreamMessage#getBody()} is {@code null}.
*
* @param key the stream key.
* @param consumer consumer identified by group name and consumer key.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,9 @@ public interface RedisStreamCommands<K, V> {

/**
* Gets ownership of one or multiple messages in the Pending Entries List of a given stream consumer group.
* <p>
* Note that setting the {@code JUSTID} flag (calling this method with {@link XClaimArgs#justid()}) suppresses the message
* bode and {@link StreamMessage#getBody()} is {@code null}.
*
* @param key the stream key.
* @param consumer consumer identified by group name and consumer key.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,9 @@ public interface NodeSelectionStreamAsyncCommands<K, V> {

/**
* Gets ownership of one or multiple messages in the Pending Entries List of a given stream consumer group.
* <p>
* Note that setting the {@code JUSTID} flag (calling this method with {@link XClaimArgs#justid()}) suppresses the message
* bode and {@link StreamMessage#getBody()} is {@code null}.
*
* @param key the stream key.
* @param consumer consumer identified by group name and consumer key.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,9 @@ public interface NodeSelectionStreamCommands<K, V> {

/**
* Gets ownership of one or multiple messages in the Pending Entries List of a given stream consumer group.
* <p>
* Note that setting the {@code JUSTID} flag (calling this method with {@link XClaimArgs#justid()}) suppresses the message
* bode and {@link StreamMessage#getBody()} is {@code null}.
*
* @param key the stream key.
* @param consumer consumer identified by group name and consumer key.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,10 @@
import java.util.List;
import java.util.Map;

import io.lettuce.core.*;
import io.lettuce.core.Limit;
import io.lettuce.core.Range;
import io.lettuce.core.StreamMessage;
import io.lettuce.core.XClaimArgs;
import io.lettuce.core.XReadArgs.StreamOffset;

/**
Expand Down Expand Up @@ -92,6 +95,9 @@ public interface RedisStreamCommands<K, V> {

/**
* Gets ownership of one or multiple messages in the Pending Entries List of a given stream consumer group.
* <p>
* Note that setting the {@code JUSTID} flag (calling this method with {@link XClaimArgs#justid()}) suppresses the message
* bode and {@link StreamMessage#getBody()} is {@code null}.
*
* @param key the stream key.
* @param consumer consumer identified by group name and consumer key.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -256,8 +256,8 @@ void xinfoGroups() {
@Test
void xinfoConsumers() {

assertThat(redis.xgroupCreate(StreamOffset.from(key, "0-0"), "group", XGroupCreateArgs.Builder.mkstream())).isEqualTo(
"OK");
assertThat(redis.xgroupCreate(StreamOffset.from(key, "0-0"), "group", XGroupCreateArgs.Builder.mkstream()))
.isEqualTo("OK");
redis.xadd(key, Collections.singletonMap("key1", "value1"));

redis.xreadgroup(Consumer.from("group", "consumer1"), StreamOffset.lastConsumed(key));
Expand All @@ -271,8 +271,8 @@ void xgroupCreate() {

assertThat(redis.xgroupCreate(StreamOffset.latest(key), "group", XGroupCreateArgs.Builder.mkstream())).isEqualTo("OK");

List<Object> groups = redis.dispatch(XINFO, new NestedMultiOutput<>(StringCodec.UTF8), new CommandArgs<>(
StringCodec.UTF8).add("GROUPS").add(key));
List<Object> groups = redis.dispatch(XINFO, new NestedMultiOutput<>(StringCodec.UTF8),
new CommandArgs<>(StringCodec.UTF8).add("GROUPS").add(key));

assertThat(groups).isNotEmpty();
assertThat(redis.type(key)).isEqualTo("stream");
Expand Down Expand Up @@ -379,6 +379,28 @@ void xclaimWithArgs() {
assertThat(message.getMsSinceLastDelivery()).isBetween(50000L, 80000L);
}

@Test
void xclaimJustId() {

String id1 = redis.xadd(key, Collections.singletonMap("key", "value"));
redis.xgroupCreate(StreamOffset.latest(key), "group");
String id2 = redis.xadd(key, Collections.singletonMap("key", "value"));
String id3 = redis.xadd(key, Collections.singletonMap("key", "value"));

redis.xreadgroup(Consumer.from("group", "consumer1"), StreamOffset.lastConsumed(key));

List<StreamMessage<String, String>> claimedMessages = redis.xclaim(key, Consumer.from("group", "consumer2"),
XClaimArgs.Builder.justid(), id1, id2, id3);

assertThat(claimedMessages).hasSize(2);

StreamMessage<String, String> message = claimedMessages.get(0);

assertThat(message.getBody()).isNull();
assertThat(message.getStream()).isEqualTo("key");
assertThat(message.getId()).isEqualTo(id2);
}

@Test
void xgroupDestroy() {

Expand Down

0 comments on commit 02330b2

Please sign in to comment.