Skip to content

Commit

Permalink
Add support for XGROUP CREATE … MKSTREAM #898
Browse files Browse the repository at this point in the history
  • Loading branch information
mp911de committed Nov 19, 2018
1 parent 83ffcae commit 7f0c98a
Show file tree
Hide file tree
Showing 11 changed files with 176 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1521,7 +1521,12 @@ public RedisFuture<Long> xdel(K key, String... messageIds) {

@Override
public RedisFuture<String> xgroupCreate(XReadArgs.StreamOffset<K> offset, K group) {
return dispatch(commandBuilder.xgroupCreate(offset, group));
return dispatch(commandBuilder.xgroupCreate(offset, group, null));
}

@Override
public RedisFuture<String> xgroupCreate(XReadArgs.StreamOffset<K> offset, K group, XGroupCreateArgs args) {
return dispatch(commandBuilder.xgroupCreate(offset, group, args));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1573,7 +1573,12 @@ public Mono<Long> xdel(K key, String... messageIds) {

@Override
public Mono<String> xgroupCreate(XReadArgs.StreamOffset<K> streamOffset, K group) {
return createMono(() -> commandBuilder.xgroupCreate(streamOffset, group));
return createMono(() -> commandBuilder.xgroupCreate(streamOffset, group, null));
}

@Override
public Mono<String> xgroupCreate(XReadArgs.StreamOffset<K> streamOffset, K group, XGroupCreateArgs args) {
return createMono(() -> commandBuilder.xgroupCreate(streamOffset, group, args));
}

@Override
Expand Down
6 changes: 5 additions & 1 deletion src/main/java/io/lettuce/core/RedisCommandBuilder.java
Original file line number Diff line number Diff line change
Expand Up @@ -2140,13 +2140,17 @@ public Command<K, V, Long> xdel(K key, String[] messageIds) {
return createCommand(XDEL, new IntegerOutput<>(codec), args);
}

public Command<K, V, String> xgroupCreate(StreamOffset<K> offset, K group) {
public Command<K, V, String> xgroupCreate(StreamOffset<K> offset, K group, XGroupCreateArgs commandArgs) {
LettuceAssert.notNull(offset, "StreamOffset " + MUST_NOT_BE_NULL);
LettuceAssert.notNull(group, "Group " + MUST_NOT_BE_NULL);

CommandArgs<K, V> args = new CommandArgs<>(codec).add(CREATE).addKey(offset.getName()).addKey(group)
.add(offset.getOffset());

if (commandArgs != null) {
commandArgs.build(args);
}

return createCommand(XGROUP, new StatusOutput<>(codec), args);
}

Expand Down
84 changes: 84 additions & 0 deletions src/main/java/io/lettuce/core/XGroupCreateArgs.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
/*
* Copyright 2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.lettuce.core;

import io.lettuce.core.protocol.CommandArgs;

/**
* Argument list builder for the Redis <a href="http://redis.io/commands/xgroup">XGROUP</a> CREATE command. Static import the
* methods from {@link Builder} and call the methods: {@code mkstream(…)} .
* <p/>
* {@link XGroupCreateArgs} is a mutable object and instances should be used only once to avoid shared mutable state.
*
* @author Mark Paluch
* @since 5.2
*/
public class XGroupCreateArgs {

private boolean mkstream;

/**
* Builder entry points for {@link XGroupCreateArgs}.
*/
public static class Builder {

/**
* Utility constructor.
*/
private Builder() {
}

/**
* Creates new {@link XGroupCreateArgs} and setting {@literal MKSTREAM}.
*
* @return new {@link XGroupCreateArgs} with {@literal MKSTREAM} set.
* @see XGroupCreateArgs#mkstream(boolean)
*/
public static XGroupCreateArgs mkstream() {
return mkstream(true);
}

/**
* Creates new {@link XGroupCreateArgs} and setting {@literal MKSTREAM}.
*
* @param mkstream whether to apply {@literal MKSTREAM}.
* @return new {@link XGroupCreateArgs} with {@literal MKSTREAM} set.
* @see XGroupCreateArgs#mkstream(boolean)
*/
public static XGroupCreateArgs mkstream(boolean mkstream) {
return new XGroupCreateArgs().mkstream(mkstream);
}
}

/**
* Make a stream if it does not exists.
*
* @param mkstream whether to apply {@literal MKSTREAM}
* @return {@code this}
*/
public XGroupCreateArgs mkstream(boolean mkstream) {

this.mkstream = mkstream;
return this;
}

public <K, V> void build(CommandArgs<K, V> args) {

if (mkstream) {
args.add("MKSTREAM");
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,17 @@ public interface RedisStreamAsyncCommands<K, V> {
*/
RedisFuture<String> xgroupCreate(StreamOffset<K> streamOffset, K group);

/**
* Create a consumer group.
*
* @param streamOffset name of the stream containing the offset to set.
* @param group name of the consumer group.
* @param args
* @return simple-reply {@literal true} if successful.
* @since 5.2
*/
RedisFuture<String> xgroupCreate(StreamOffset<K> streamOffset, K group, XGroupCreateArgs args);

/**
* Delete a consumer from a consumer group.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,17 @@ public interface RedisStreamReactiveCommands<K, V> {
*/
Mono<String> xgroupCreate(StreamOffset<K> streamOffset, K group);

/**
* Create a consumer group.
*
* @param streamOffset name of the stream containing the offset to set.
* @param group name of the consumer group.
* @param args
* @return simple-reply {@literal true} if successful.
* @since 5.2
*/
Mono<String> xgroupCreate(StreamOffset<K> streamOffset, K group, XGroupCreateArgs args);

/**
* Delete a consumer from a consumer group.
*
Expand Down
11 changes: 11 additions & 0 deletions src/main/java/io/lettuce/core/api/sync/RedisStreamCommands.java
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,17 @@ public interface RedisStreamCommands<K, V> {
*/
String xgroupCreate(StreamOffset<K> streamOffset, K group);

/**
* Create a consumer group.
*
* @param streamOffset name of the stream containing the offset to set.
* @param group name of the consumer group.
* @param args
* @return simple-reply {@literal true} if successful.
* @since 5.2
*/
String xgroupCreate(StreamOffset<K> streamOffset, K group, XGroupCreateArgs args);

/**
* Delete a consumer from a consumer group.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,17 @@ public interface NodeSelectionStreamAsyncCommands<K, V> {
*/
AsyncExecutions<String> xgroupCreate(StreamOffset<K> streamOffset, K group);

/**
* Create a consumer group.
*
* @param streamOffset name of the stream containing the offset to set.
* @param group name of the consumer group.
* @param args
* @return simple-reply {@literal true} if successful.
* @since 5.2
*/
AsyncExecutions<String> xgroupCreate(StreamOffset<K> streamOffset, K group, XGroupCreateArgs args);

/**
* Delete a consumer from a consumer group.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,17 @@ public interface NodeSelectionStreamCommands<K, V> {
*/
Executions<String> xgroupCreate(StreamOffset<K> streamOffset, K group);

/**
* Create a consumer group.
*
* @param streamOffset name of the stream containing the offset to set.
* @param group name of the consumer group.
* @param args
* @return simple-reply {@literal true} if successful.
* @since 5.2
*/
Executions<String> xgroupCreate(StreamOffset<K> streamOffset, K group, XGroupCreateArgs args);

/**
* Delete a consumer from a consumer group.
*
Expand Down
11 changes: 11 additions & 0 deletions src/main/templates/io/lettuce/core/api/RedisStreamCommands.java
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,17 @@ public interface RedisStreamCommands<K, V> {
*/
String xgroupCreate(StreamOffset<K> streamOffset, K group);

/**
* Create a consumer group.
*
* @param streamOffset name of the stream containing the offset to set.
* @param group name of the consumer group.
* @param args
* @return simple-reply {@literal true} if successful.
* @since 5.2
*/
String xgroupCreate(StreamOffset<K> streamOffset, K group, XGroupCreateArgs args);

/**
* Delete a consumer from a consumer group.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -237,14 +237,13 @@ void xreadTransactional() {
@Test
void xgroupCreate() {

redis.xadd(key, Collections.singletonMap("key", "value"));

assertThat(redis.xgroupCreate(StreamOffset.latest(key), "group")).isEqualTo("OK");
assertThat(redis.xgroupCreate(StreamOffset.latest(key), "group", XGroupCreateArgs.Builder.mkstream())).isEqualTo("OK");

List<Object> groups = redis.dispatch(XINFO, new NestedMultiOutput<>(StringCodec.UTF8), new CommandArgs<>(
StringCodec.UTF8).add("GROUPS").add(key));

assertThat(groups).isNotEmpty();
assertThat(redis.type(key)).isEqualTo("stream");
}

@Test
Expand All @@ -263,8 +262,7 @@ void xgroupread() {
@Test
void xpendingWithGroup() {

redis.xadd(key, Collections.singletonMap("key", "value"));
redis.xgroupCreate(StreamOffset.latest(key), "group");
redis.xgroupCreate(StreamOffset.latest(key), "group", XGroupCreateArgs.Builder.mkstream());
String id = redis.xadd(key, Collections.singletonMap("key", "value"));

redis.xreadgroup(Consumer.from("group", "consumer1"), StreamOffset.lastConsumed(key));
Expand All @@ -276,8 +274,7 @@ void xpendingWithGroup() {
@Test
void xpending() {

redis.xadd(key, Collections.singletonMap("key", "value"));
redis.xgroupCreate(StreamOffset.latest(key), "group");
redis.xgroupCreate(StreamOffset.latest(key), "group", XGroupCreateArgs.Builder.mkstream());
String id = redis.xadd(key, Collections.singletonMap("key", "value"));

redis.xreadgroup(Consumer.from("group", "consumer1"), StreamOffset.lastConsumed(key));
Expand All @@ -296,8 +293,7 @@ void xpending() {
@Test
void xack() {

redis.xadd(key, Collections.singletonMap("key", "value"));
redis.xgroupCreate(StreamOffset.latest(key), "group");
redis.xgroupCreate(StreamOffset.latest(key), "group", XGroupCreateArgs.Builder.mkstream());
redis.xadd(key, Collections.singletonMap("key", "value"));

List<StreamMessage<String, String>> messages = redis.xreadgroup(Consumer.from("group", "consumer1"),
Expand All @@ -313,8 +309,7 @@ void xack() {
@Test
void xclaim() {

redis.xadd(key, Collections.singletonMap("key", "value"));
redis.xgroupCreate(StreamOffset.latest(key), "group");
redis.xgroupCreate(StreamOffset.latest(key), "group", XGroupCreateArgs.Builder.mkstream());
redis.xadd(key, Collections.singletonMap("key", "value"));

List<StreamMessage<String, String>> messages = redis.xreadgroup(Consumer.from("group", "consumer1"),
Expand Down Expand Up @@ -355,8 +350,7 @@ void xclaimWithArgs() {
@Test
void xgroupDestroy() {

redis.xadd(key, Collections.singletonMap("key", "value"));
redis.xgroupCreate(StreamOffset.latest(key), "group");
redis.xgroupCreate(StreamOffset.latest(key), "group", XGroupCreateArgs.Builder.mkstream());

assertThat(redis.xgroupDestroy(key, "group")).isTrue();
assertThat(redis.xgroupDestroy(key, "group")).isFalse();
Expand All @@ -365,8 +359,7 @@ void xgroupDestroy() {
@Test
void xgroupDelconsumer() {

redis.xadd(key, Collections.singletonMap("key", "value"));
redis.xgroupCreate(StreamOffset.latest(key), "group");
redis.xgroupCreate(StreamOffset.latest(key), "group", XGroupCreateArgs.Builder.mkstream());
redis.xadd(key, Collections.singletonMap("key", "value"));
redis.xreadgroup(Consumer.from("group", "consumer1"), StreamOffset.lastConsumed(key));

Expand All @@ -377,8 +370,7 @@ void xgroupDelconsumer() {
@Test
void xgroupSetid() {

redis.xadd(key, Collections.singletonMap("key", "value"));
redis.xgroupCreate(StreamOffset.latest(key), "group");
redis.xgroupCreate(StreamOffset.latest(key), "group", XGroupCreateArgs.Builder.mkstream());

assertThat(redis.xgroupSetid(StreamOffset.latest(key), "group")).isEqualTo("OK");
}
Expand Down

0 comments on commit 7f0c98a

Please sign in to comment.