diff --git a/src/main/java/io/lettuce/core/AbstractRedisAsyncCommands.java b/src/main/java/io/lettuce/core/AbstractRedisAsyncCommands.java index 9604b0bba7..d4abfc6b02 100644 --- a/src/main/java/io/lettuce/core/AbstractRedisAsyncCommands.java +++ b/src/main/java/io/lettuce/core/AbstractRedisAsyncCommands.java @@ -1521,7 +1521,12 @@ public RedisFuture xdel(K key, String... messageIds) { @Override public RedisFuture xgroupCreate(XReadArgs.StreamOffset offset, K group) { - return dispatch(commandBuilder.xgroupCreate(offset, group)); + return dispatch(commandBuilder.xgroupCreate(offset, group, null)); + } + + @Override + public RedisFuture xgroupCreate(XReadArgs.StreamOffset offset, K group, XGroupCreateArgs args) { + return dispatch(commandBuilder.xgroupCreate(offset, group, args)); } @Override diff --git a/src/main/java/io/lettuce/core/AbstractRedisReactiveCommands.java b/src/main/java/io/lettuce/core/AbstractRedisReactiveCommands.java index bd918bd0b8..5adc70d4f5 100644 --- a/src/main/java/io/lettuce/core/AbstractRedisReactiveCommands.java +++ b/src/main/java/io/lettuce/core/AbstractRedisReactiveCommands.java @@ -1573,7 +1573,12 @@ public Mono xdel(K key, String... messageIds) { @Override public Mono xgroupCreate(XReadArgs.StreamOffset streamOffset, K group) { - return createMono(() -> commandBuilder.xgroupCreate(streamOffset, group)); + return createMono(() -> commandBuilder.xgroupCreate(streamOffset, group, null)); + } + + @Override + public Mono xgroupCreate(XReadArgs.StreamOffset streamOffset, K group, XGroupCreateArgs args) { + return createMono(() -> commandBuilder.xgroupCreate(streamOffset, group, args)); } @Override diff --git a/src/main/java/io/lettuce/core/RedisCommandBuilder.java b/src/main/java/io/lettuce/core/RedisCommandBuilder.java index ef73bb1c1d..8b250ea8ae 100644 --- a/src/main/java/io/lettuce/core/RedisCommandBuilder.java +++ b/src/main/java/io/lettuce/core/RedisCommandBuilder.java @@ -2140,13 +2140,17 @@ public Command xdel(K key, String[] messageIds) { return createCommand(XDEL, new IntegerOutput<>(codec), args); } - public Command xgroupCreate(StreamOffset offset, K group) { + public Command xgroupCreate(StreamOffset offset, K group, XGroupCreateArgs commandArgs) { LettuceAssert.notNull(offset, "StreamOffset " + MUST_NOT_BE_NULL); LettuceAssert.notNull(group, "Group " + MUST_NOT_BE_NULL); CommandArgs args = new CommandArgs<>(codec).add(CREATE).addKey(offset.getName()).addKey(group) .add(offset.getOffset()); + if (commandArgs != null) { + commandArgs.build(args); + } + return createCommand(XGROUP, new StatusOutput<>(codec), args); } diff --git a/src/main/java/io/lettuce/core/XGroupCreateArgs.java b/src/main/java/io/lettuce/core/XGroupCreateArgs.java new file mode 100644 index 0000000000..0f0b021825 --- /dev/null +++ b/src/main/java/io/lettuce/core/XGroupCreateArgs.java @@ -0,0 +1,84 @@ +/* + * Copyright 2018 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.lettuce.core; + +import io.lettuce.core.protocol.CommandArgs; + +/** + * Argument list builder for the Redis XGROUP CREATE command. Static import the + * methods from {@link Builder} and call the methods: {@code mkstream(…)} . + *

+ * {@link XGroupCreateArgs} is a mutable object and instances should be used only once to avoid shared mutable state. + * + * @author Mark Paluch + * @since 5.2 + */ +public class XGroupCreateArgs { + + private boolean mkstream; + + /** + * Builder entry points for {@link XGroupCreateArgs}. + */ + public static class Builder { + + /** + * Utility constructor. + */ + private Builder() { + } + + /** + * Creates new {@link XGroupCreateArgs} and setting {@literal MKSTREAM}. + * + * @return new {@link XGroupCreateArgs} with {@literal MKSTREAM} set. + * @see XGroupCreateArgs#mkstream(boolean) + */ + public static XGroupCreateArgs mkstream() { + return mkstream(true); + } + + /** + * Creates new {@link XGroupCreateArgs} and setting {@literal MKSTREAM}. + * + * @param mkstream whether to apply {@literal MKSTREAM}. + * @return new {@link XGroupCreateArgs} with {@literal MKSTREAM} set. + * @see XGroupCreateArgs#mkstream(boolean) + */ + public static XGroupCreateArgs mkstream(boolean mkstream) { + return new XGroupCreateArgs().mkstream(mkstream); + } + } + + /** + * Make a stream if it does not exists. + * + * @param mkstream whether to apply {@literal MKSTREAM} + * @return {@code this} + */ + public XGroupCreateArgs mkstream(boolean mkstream) { + + this.mkstream = mkstream; + return this; + } + + public void build(CommandArgs args) { + + if (mkstream) { + args.add("MKSTREAM"); + } + } +} diff --git a/src/main/java/io/lettuce/core/api/async/RedisStreamAsyncCommands.java b/src/main/java/io/lettuce/core/api/async/RedisStreamAsyncCommands.java index b9df1b2493..1540a127d7 100644 --- a/src/main/java/io/lettuce/core/api/async/RedisStreamAsyncCommands.java +++ b/src/main/java/io/lettuce/core/api/async/RedisStreamAsyncCommands.java @@ -121,6 +121,17 @@ public interface RedisStreamAsyncCommands { */ RedisFuture xgroupCreate(StreamOffset streamOffset, K group); + /** + * Create a consumer group. + * + * @param streamOffset name of the stream containing the offset to set. + * @param group name of the consumer group. + * @param args + * @return simple-reply {@literal true} if successful. + * @since 5.2 + */ + RedisFuture xgroupCreate(StreamOffset streamOffset, K group, XGroupCreateArgs args); + /** * Delete a consumer from a consumer group. * diff --git a/src/main/java/io/lettuce/core/api/reactive/RedisStreamReactiveCommands.java b/src/main/java/io/lettuce/core/api/reactive/RedisStreamReactiveCommands.java index c3da1431a9..c774ff5463 100644 --- a/src/main/java/io/lettuce/core/api/reactive/RedisStreamReactiveCommands.java +++ b/src/main/java/io/lettuce/core/api/reactive/RedisStreamReactiveCommands.java @@ -122,6 +122,17 @@ public interface RedisStreamReactiveCommands { */ Mono xgroupCreate(StreamOffset streamOffset, K group); + /** + * Create a consumer group. + * + * @param streamOffset name of the stream containing the offset to set. + * @param group name of the consumer group. + * @param args + * @return simple-reply {@literal true} if successful. + * @since 5.2 + */ + Mono xgroupCreate(StreamOffset streamOffset, K group, XGroupCreateArgs args); + /** * Delete a consumer from a consumer group. * diff --git a/src/main/java/io/lettuce/core/api/sync/RedisStreamCommands.java b/src/main/java/io/lettuce/core/api/sync/RedisStreamCommands.java index d215bacac8..b67cf3855d 100644 --- a/src/main/java/io/lettuce/core/api/sync/RedisStreamCommands.java +++ b/src/main/java/io/lettuce/core/api/sync/RedisStreamCommands.java @@ -121,6 +121,17 @@ public interface RedisStreamCommands { */ String xgroupCreate(StreamOffset streamOffset, K group); + /** + * Create a consumer group. + * + * @param streamOffset name of the stream containing the offset to set. + * @param group name of the consumer group. + * @param args + * @return simple-reply {@literal true} if successful. + * @since 5.2 + */ + String xgroupCreate(StreamOffset streamOffset, K group, XGroupCreateArgs args); + /** * Delete a consumer from a consumer group. * diff --git a/src/main/java/io/lettuce/core/cluster/api/async/NodeSelectionStreamAsyncCommands.java b/src/main/java/io/lettuce/core/cluster/api/async/NodeSelectionStreamAsyncCommands.java index e03de55fb7..4cbe0e2c2b 100644 --- a/src/main/java/io/lettuce/core/cluster/api/async/NodeSelectionStreamAsyncCommands.java +++ b/src/main/java/io/lettuce/core/cluster/api/async/NodeSelectionStreamAsyncCommands.java @@ -121,6 +121,17 @@ public interface NodeSelectionStreamAsyncCommands { */ AsyncExecutions xgroupCreate(StreamOffset streamOffset, K group); + /** + * Create a consumer group. + * + * @param streamOffset name of the stream containing the offset to set. + * @param group name of the consumer group. + * @param args + * @return simple-reply {@literal true} if successful. + * @since 5.2 + */ + AsyncExecutions xgroupCreate(StreamOffset streamOffset, K group, XGroupCreateArgs args); + /** * Delete a consumer from a consumer group. * diff --git a/src/main/java/io/lettuce/core/cluster/api/sync/NodeSelectionStreamCommands.java b/src/main/java/io/lettuce/core/cluster/api/sync/NodeSelectionStreamCommands.java index 9316a6be0a..cdca1cba4b 100644 --- a/src/main/java/io/lettuce/core/cluster/api/sync/NodeSelectionStreamCommands.java +++ b/src/main/java/io/lettuce/core/cluster/api/sync/NodeSelectionStreamCommands.java @@ -121,6 +121,17 @@ public interface NodeSelectionStreamCommands { */ Executions xgroupCreate(StreamOffset streamOffset, K group); + /** + * Create a consumer group. + * + * @param streamOffset name of the stream containing the offset to set. + * @param group name of the consumer group. + * @param args + * @return simple-reply {@literal true} if successful. + * @since 5.2 + */ + Executions xgroupCreate(StreamOffset streamOffset, K group, XGroupCreateArgs args); + /** * Delete a consumer from a consumer group. * diff --git a/src/main/templates/io/lettuce/core/api/RedisStreamCommands.java b/src/main/templates/io/lettuce/core/api/RedisStreamCommands.java index cd3eafe1f6..09a482b748 100644 --- a/src/main/templates/io/lettuce/core/api/RedisStreamCommands.java +++ b/src/main/templates/io/lettuce/core/api/RedisStreamCommands.java @@ -120,6 +120,17 @@ public interface RedisStreamCommands { */ String xgroupCreate(StreamOffset streamOffset, K group); + /** + * Create a consumer group. + * + * @param streamOffset name of the stream containing the offset to set. + * @param group name of the consumer group. + * @param args + * @return simple-reply {@literal true} if successful. + * @since 5.2 + */ + String xgroupCreate(StreamOffset streamOffset, K group, XGroupCreateArgs args); + /** * Delete a consumer from a consumer group. * diff --git a/src/test/java/io/lettuce/core/commands/StreamCommandIntegrationTests.java b/src/test/java/io/lettuce/core/commands/StreamCommandIntegrationTests.java index 689970cef1..48fc966dcf 100644 --- a/src/test/java/io/lettuce/core/commands/StreamCommandIntegrationTests.java +++ b/src/test/java/io/lettuce/core/commands/StreamCommandIntegrationTests.java @@ -237,14 +237,13 @@ void xreadTransactional() { @Test void xgroupCreate() { - redis.xadd(key, Collections.singletonMap("key", "value")); - - assertThat(redis.xgroupCreate(StreamOffset.latest(key), "group")).isEqualTo("OK"); + assertThat(redis.xgroupCreate(StreamOffset.latest(key), "group", XGroupCreateArgs.Builder.mkstream())).isEqualTo("OK"); List groups = redis.dispatch(XINFO, new NestedMultiOutput<>(StringCodec.UTF8), new CommandArgs<>( StringCodec.UTF8).add("GROUPS").add(key)); assertThat(groups).isNotEmpty(); + assertThat(redis.type(key)).isEqualTo("stream"); } @Test @@ -263,8 +262,7 @@ void xgroupread() { @Test void xpendingWithGroup() { - redis.xadd(key, Collections.singletonMap("key", "value")); - redis.xgroupCreate(StreamOffset.latest(key), "group"); + redis.xgroupCreate(StreamOffset.latest(key), "group", XGroupCreateArgs.Builder.mkstream()); String id = redis.xadd(key, Collections.singletonMap("key", "value")); redis.xreadgroup(Consumer.from("group", "consumer1"), StreamOffset.lastConsumed(key)); @@ -276,8 +274,7 @@ void xpendingWithGroup() { @Test void xpending() { - redis.xadd(key, Collections.singletonMap("key", "value")); - redis.xgroupCreate(StreamOffset.latest(key), "group"); + redis.xgroupCreate(StreamOffset.latest(key), "group", XGroupCreateArgs.Builder.mkstream()); String id = redis.xadd(key, Collections.singletonMap("key", "value")); redis.xreadgroup(Consumer.from("group", "consumer1"), StreamOffset.lastConsumed(key)); @@ -296,8 +293,7 @@ void xpending() { @Test void xack() { - redis.xadd(key, Collections.singletonMap("key", "value")); - redis.xgroupCreate(StreamOffset.latest(key), "group"); + redis.xgroupCreate(StreamOffset.latest(key), "group", XGroupCreateArgs.Builder.mkstream()); redis.xadd(key, Collections.singletonMap("key", "value")); List> messages = redis.xreadgroup(Consumer.from("group", "consumer1"), @@ -313,8 +309,7 @@ void xack() { @Test void xclaim() { - redis.xadd(key, Collections.singletonMap("key", "value")); - redis.xgroupCreate(StreamOffset.latest(key), "group"); + redis.xgroupCreate(StreamOffset.latest(key), "group", XGroupCreateArgs.Builder.mkstream()); redis.xadd(key, Collections.singletonMap("key", "value")); List> messages = redis.xreadgroup(Consumer.from("group", "consumer1"), @@ -355,8 +350,7 @@ void xclaimWithArgs() { @Test void xgroupDestroy() { - redis.xadd(key, Collections.singletonMap("key", "value")); - redis.xgroupCreate(StreamOffset.latest(key), "group"); + redis.xgroupCreate(StreamOffset.latest(key), "group", XGroupCreateArgs.Builder.mkstream()); assertThat(redis.xgroupDestroy(key, "group")).isTrue(); assertThat(redis.xgroupDestroy(key, "group")).isFalse(); @@ -365,8 +359,7 @@ void xgroupDestroy() { @Test void xgroupDelconsumer() { - redis.xadd(key, Collections.singletonMap("key", "value")); - redis.xgroupCreate(StreamOffset.latest(key), "group"); + redis.xgroupCreate(StreamOffset.latest(key), "group", XGroupCreateArgs.Builder.mkstream()); redis.xadd(key, Collections.singletonMap("key", "value")); redis.xreadgroup(Consumer.from("group", "consumer1"), StreamOffset.lastConsumed(key)); @@ -377,8 +370,7 @@ void xgroupDelconsumer() { @Test void xgroupSetid() { - redis.xadd(key, Collections.singletonMap("key", "value")); - redis.xgroupCreate(StreamOffset.latest(key), "group"); + redis.xgroupCreate(StreamOffset.latest(key), "group", XGroupCreateArgs.Builder.mkstream()); assertThat(redis.xgroupSetid(StreamOffset.latest(key), "group")).isEqualTo("OK"); }