diff --git a/src/main/java/io/lettuce/core/AbstractRedisAsyncCommands.java b/src/main/java/io/lettuce/core/AbstractRedisAsyncCommands.java index ab55044d2b..ccf5476e79 100644 --- a/src/main/java/io/lettuce/core/AbstractRedisAsyncCommands.java +++ b/src/main/java/io/lettuce/core/AbstractRedisAsyncCommands.java @@ -1636,6 +1636,11 @@ public RedisFuture xgroupCreate(XReadArgs.StreamOffset offset, K grou return dispatch(commandBuilder.xgroupCreate(offset, group, args)); } + @Override + public RedisFuture xgroupCreateconsumer(K key, Consumer consumer) { + return dispatch(commandBuilder.xgroupCreateconsumer(key, consumer)); + } + @Override public RedisFuture xgroupDelconsumer(K key, Consumer consumer) { return dispatch(commandBuilder.xgroupDelconsumer(key, consumer)); diff --git a/src/main/java/io/lettuce/core/AbstractRedisReactiveCommands.java b/src/main/java/io/lettuce/core/AbstractRedisReactiveCommands.java index ebdd3c5c9d..18f2e9dcc3 100644 --- a/src/main/java/io/lettuce/core/AbstractRedisReactiveCommands.java +++ b/src/main/java/io/lettuce/core/AbstractRedisReactiveCommands.java @@ -1711,6 +1711,11 @@ public Mono xgroupCreate(XReadArgs.StreamOffset streamOffset, K group return createMono(() -> commandBuilder.xgroupCreate(streamOffset, group, args)); } + @Override + public Mono xgroupCreateconsumer(K key, Consumer consumer) { + return createMono(() -> commandBuilder.xgroupCreateconsumer(key, consumer)); + } + @Override public Mono xgroupDelconsumer(K key, Consumer consumer) { return createMono(() -> commandBuilder.xgroupDelconsumer(key, consumer)); diff --git a/src/main/java/io/lettuce/core/RedisCommandBuilder.java b/src/main/java/io/lettuce/core/RedisCommandBuilder.java index d10cc60cb7..71a73f2f4b 100644 --- a/src/main/java/io/lettuce/core/RedisCommandBuilder.java +++ b/src/main/java/io/lettuce/core/RedisCommandBuilder.java @@ -2314,6 +2314,16 @@ public Command xgroupCreate(StreamOffset offset, K group, XGrou return createCommand(XGROUP, new StatusOutput<>(codec), args); } + public Command xgroupCreateconsumer(K key, Consumer consumer) { + notNullKey(key); + LettuceAssert.notNull(consumer, "Consumer " + MUST_NOT_BE_NULL); + + CommandArgs args = new CommandArgs<>(codec).add("CREATECONSUMER").addKey(key).addKey(consumer.getGroup()) + .addKey(consumer.getName()); + + return createCommand(XGROUP, new BooleanOutput<>(codec), args); + } + public Command xgroupDelconsumer(K key, Consumer consumer) { notNullKey(key); LettuceAssert.notNull(consumer, "Consumer " + MUST_NOT_BE_NULL); diff --git a/src/main/java/io/lettuce/core/api/async/RedisStreamAsyncCommands.java b/src/main/java/io/lettuce/core/api/async/RedisStreamAsyncCommands.java index 4e6f77bd98..355e76f453 100644 --- a/src/main/java/io/lettuce/core/api/async/RedisStreamAsyncCommands.java +++ b/src/main/java/io/lettuce/core/api/async/RedisStreamAsyncCommands.java @@ -137,6 +137,16 @@ public interface RedisStreamAsyncCommands { */ RedisFuture xgroupCreate(StreamOffset streamOffset, K group, XGroupCreateArgs args); + /** + * Create a consumer from a consumer group. + * + * @param key the stream key. + * @param consumer consumer identified by group name and consumer key. + * @return simple-reply {@code true} if successful. + * @since 6.1 + */ + RedisFuture xgroupCreateconsumer(K key, Consumer consumer); + /** * Delete a consumer from a consumer group. * diff --git a/src/main/java/io/lettuce/core/api/reactive/RedisStreamReactiveCommands.java b/src/main/java/io/lettuce/core/api/reactive/RedisStreamReactiveCommands.java index 5fce2bdb6e..52ccda75f8 100644 --- a/src/main/java/io/lettuce/core/api/reactive/RedisStreamReactiveCommands.java +++ b/src/main/java/io/lettuce/core/api/reactive/RedisStreamReactiveCommands.java @@ -138,6 +138,16 @@ public interface RedisStreamReactiveCommands { */ Mono xgroupCreate(StreamOffset streamOffset, K group, XGroupCreateArgs args); + /** + * Create a consumer from a consumer group. + * + * @param key the stream key. + * @param consumer consumer identified by group name and consumer key. + * @return simple-reply {@code true} if successful. + * @since 6.1 + */ + Mono xgroupCreateconsumer(K key, Consumer consumer); + /** * Delete a consumer from a consumer group. * diff --git a/src/main/java/io/lettuce/core/api/sync/RedisStreamCommands.java b/src/main/java/io/lettuce/core/api/sync/RedisStreamCommands.java index b5331a2839..d84516b764 100644 --- a/src/main/java/io/lettuce/core/api/sync/RedisStreamCommands.java +++ b/src/main/java/io/lettuce/core/api/sync/RedisStreamCommands.java @@ -137,6 +137,16 @@ public interface RedisStreamCommands { */ String xgroupCreate(StreamOffset streamOffset, K group, XGroupCreateArgs args); + /** + * Create a consumer from a consumer group. + * + * @param key the stream key. + * @param consumer consumer identified by group name and consumer key. + * @return simple-reply {@code true} if successful. + * @since 6.1 + */ + Boolean xgroupCreateconsumer(K key, Consumer consumer); + /** * Delete a consumer from a consumer group. * diff --git a/src/main/java/io/lettuce/core/cluster/api/async/NodeSelectionStreamAsyncCommands.java b/src/main/java/io/lettuce/core/cluster/api/async/NodeSelectionStreamAsyncCommands.java index d552ab262b..08bd86985c 100644 --- a/src/main/java/io/lettuce/core/cluster/api/async/NodeSelectionStreamAsyncCommands.java +++ b/src/main/java/io/lettuce/core/cluster/api/async/NodeSelectionStreamAsyncCommands.java @@ -137,6 +137,16 @@ public interface NodeSelectionStreamAsyncCommands { */ AsyncExecutions xgroupCreate(StreamOffset streamOffset, K group, XGroupCreateArgs args); + /** + * Create a consumer from a consumer group. + * + * @param key the stream key. + * @param consumer consumer identified by group name and consumer key. + * @return simple-reply {@code true} if successful. + * @since 6.1 + */ + AsyncExecutions xgroupCreateconsumer(K key, Consumer consumer); + /** * Delete a consumer from a consumer group. * diff --git a/src/main/java/io/lettuce/core/cluster/api/sync/NodeSelectionStreamCommands.java b/src/main/java/io/lettuce/core/cluster/api/sync/NodeSelectionStreamCommands.java index 5cea1a76e3..d0a8e69492 100644 --- a/src/main/java/io/lettuce/core/cluster/api/sync/NodeSelectionStreamCommands.java +++ b/src/main/java/io/lettuce/core/cluster/api/sync/NodeSelectionStreamCommands.java @@ -137,6 +137,16 @@ public interface NodeSelectionStreamCommands { */ Executions xgroupCreate(StreamOffset streamOffset, K group, XGroupCreateArgs args); + /** + * Create a consumer from a consumer group. + * + * @param key the stream key. + * @param consumer consumer identified by group name and consumer key. + * @return simple-reply {@code true} if successful. + * @since 6.1 + */ + Executions xgroupCreateconsumer(K key, Consumer consumer); + /** * Delete a consumer from a consumer group. * diff --git a/src/main/kotlin/io/lettuce/core/api/coroutines/RedisStreamCoroutinesCommands.kt b/src/main/kotlin/io/lettuce/core/api/coroutines/RedisStreamCoroutinesCommands.kt index b3603eb2b8..79a1f6aab1 100644 --- a/src/main/kotlin/io/lettuce/core/api/coroutines/RedisStreamCoroutinesCommands.kt +++ b/src/main/kotlin/io/lettuce/core/api/coroutines/RedisStreamCoroutinesCommands.kt @@ -137,6 +137,16 @@ interface RedisStreamCoroutinesCommands { */ suspend fun xgroupCreate(streamOffset: StreamOffset, group: K, args: XGroupCreateArgs): String? + /** + * Create a consumer from a consumer group. + * + * @param key the stream key. + * @param consumer consumer identified by group name and consumer key. + * @return simple-reply `true` if successful. + * @since 6.1 + */ + suspend fun xgroupCreateconsumer(key: K, consumer: Consumer): Boolean? + /** * Delete a consumer from a consumer group. * diff --git a/src/main/kotlin/io/lettuce/core/api/coroutines/RedisStreamCoroutinesCommandsImpl.kt b/src/main/kotlin/io/lettuce/core/api/coroutines/RedisStreamCoroutinesCommandsImpl.kt index 8b1183cd11..62cb87161c 100644 --- a/src/main/kotlin/io/lettuce/core/api/coroutines/RedisStreamCoroutinesCommandsImpl.kt +++ b/src/main/kotlin/io/lettuce/core/api/coroutines/RedisStreamCoroutinesCommandsImpl.kt @@ -57,6 +57,8 @@ internal class RedisStreamCoroutinesCommandsImpl(internal val override suspend fun xgroupCreate(streamOffset: StreamOffset, group: K, args: XGroupCreateArgs): String? = ops.xgroupCreate(streamOffset, group, args).awaitFirstOrNull() + override suspend fun xgroupCreateconsumer(key: K, consumer: Consumer): Boolean? = ops.xgroupCreateconsumer(key, consumer).awaitFirstOrNull() + override suspend fun xgroupDelconsumer(key: K, consumer: Consumer): Long? = ops.xgroupDelconsumer(key, consumer).awaitFirstOrNull() override suspend fun xgroupDestroy(key: K, group: K): Boolean? = ops.xgroupDestroy(key, group).awaitFirstOrNull() diff --git a/src/main/templates/io/lettuce/core/api/RedisStreamCommands.java b/src/main/templates/io/lettuce/core/api/RedisStreamCommands.java index fe00dce798..6d8bb92f36 100644 --- a/src/main/templates/io/lettuce/core/api/RedisStreamCommands.java +++ b/src/main/templates/io/lettuce/core/api/RedisStreamCommands.java @@ -143,6 +143,16 @@ public interface RedisStreamCommands { */ String xgroupCreate(StreamOffset streamOffset, K group, XGroupCreateArgs args); + /** + * Create a consumer from a consumer group. + * + * @param key the stream key. + * @param consumer consumer identified by group name and consumer key. + * @return simple-reply {@code true} if successful. + * @since 6.1 + */ + Boolean xgroupCreateconsumer(K key, Consumer consumer); + /** * Delete a consumer from a consumer group. * diff --git a/src/test/java/io/lettuce/core/commands/StreamCommandIntegrationTests.java b/src/test/java/io/lettuce/core/commands/StreamCommandIntegrationTests.java index 8825058381..8bc7766f17 100644 --- a/src/test/java/io/lettuce/core/commands/StreamCommandIntegrationTests.java +++ b/src/test/java/io/lettuce/core/commands/StreamCommandIntegrationTests.java @@ -331,6 +331,17 @@ void xgroupCreate() { assertThat(redis.type(key)).isEqualTo("stream"); } + @Test + @EnabledOnCommand("LMOVE") // Redis 6.2 + void xgroupCreateconsumer() { + + redis.xgroupCreate(StreamOffset.latest(key), "group", XGroupCreateArgs.Builder.mkstream()); + redis.xadd(key, Collections.singletonMap("key", "value")); + + assertThat(redis.xgroupCreateconsumer(key, Consumer.from("group", "consumer1"))).isTrue(); + assertThat(redis.xgroupCreateconsumer(key, Consumer.from("group", "consumer1"))).isFalse(); + } + @Test void xgroupread() {