diff --git a/CHANGELOG.md b/CHANGELOG.md index 87f5b5a2d4..3f1ab43d19 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -52,6 +52,7 @@ * Python: Added COPY command ([#1626](https://github.com/aws/glide-for-redis/pull/1626)) * Python: Added XREVRANGE command ([#1625](https://github.com/aws/glide-for-redis/pull/1625)) * Python: Added XREAD command ([#1644](https://github.com/aws/glide-for-redis/pull/1644)) +* Python: Added XGROUP CREATE and XGROUP DESTROY commands ([#1646](https://github.com/aws/glide-for-redis/pull/1646)) ### Breaking Changes * Node: Update XREAD to return a Map of Map ([#1494](https://github.com/aws/glide-for-redis/pull/1494)) diff --git a/python/python/glide/__init__.py b/python/python/glide/__init__.py index dc8881a8e6..40133df2cf 100644 --- a/python/python/glide/__init__.py +++ b/python/python/glide/__init__.py @@ -52,6 +52,7 @@ MaxId, MinId, StreamAddOptions, + StreamGroupOptions, StreamRangeBound, StreamReadOptions, StreamTrimOptions, @@ -159,6 +160,7 @@ "MaxId", "MinId", "StreamAddOptions", + "StreamGroupOptions", "StreamRangeBound", "StreamReadOptions", "StreamTrimOptions", diff --git a/python/python/glide/async_commands/core.py b/python/python/glide/async_commands/core.py index f509d647bc..fc8c8a2185 100644 --- a/python/python/glide/async_commands/core.py +++ b/python/python/glide/async_commands/core.py @@ -47,6 +47,7 @@ ) from glide.async_commands.stream import ( StreamAddOptions, + StreamGroupOptions, StreamRangeBound, StreamReadOptions, StreamTrimOptions, @@ -2802,6 +2803,65 @@ async def xread( await self._execute_command(RequestType.XRead, args), ) + async def xgroup_create( + self, + key: str, + group_name: str, + group_id: str, + options: Optional[StreamGroupOptions] = None, + ) -> TOK: + """ + Creates a new consumer group uniquely identified by `group_name` for the stream stored at `key`. + + See https://valkey.io/commands/xgroup-create for more details. + + Args: + key (str): The key of the stream. + group_name (str): The newly created consumer group name. + group_id (str): The stream entry ID that specifies the last delivered entry in the stream from the new + group’s perspective. The special ID "$" can be used to specify the last entry in the stream. + options (Optional[StreamGroupOptions]): Options for creating the stream group. + + Returns: + TOK: A simple "OK" response. + + Examples: + >>> await client.xgroup_create("mystream", "mygroup", "$", StreamGroupOptions(make_stream=True)) + OK + # Created the consumer group "mygroup" for the stream "mystream", which will track entries created after + # the most recent entry. The stream was created with length 0 if it did not already exist. + """ + args = [key, group_name, group_id] + if options is not None: + args.extend(options.to_args()) + + return cast( + TOK, + await self._execute_command(RequestType.XGroupCreate, args), + ) + + async def xgroup_destroy(self, key: str, group_name: str) -> bool: + """ + Destroys the consumer group `group_name` for the stream stored at `key`. + + See https://valkey.io/commands/xgroup-destroy for more details. + + Args: + key (str): The key of the stream. + group_name (str): The consumer group name to delete. + + Returns: + bool: True if the consumer group was destroyed. Otherwise, returns False. + + Examples: + >>> await client.xgroup_destroy("mystream", "mygroup") + True # The consumer group "mygroup" for stream "mystream" was destroyed. + """ + return cast( + bool, + await self._execute_command(RequestType.XGroupDestroy, [key, group_name]), + ) + async def geoadd( self, key: str, diff --git a/python/python/glide/async_commands/stream.py b/python/python/glide/async_commands/stream.py index 6d301d820a..ef793564e7 100644 --- a/python/python/glide/async_commands/stream.py +++ b/python/python/glide/async_commands/stream.py @@ -265,3 +265,40 @@ def to_args(self) -> List[str]: args.extend([self.READ_COUNT_REDIS_API, str(self.count)]) return args + + +class StreamGroupOptions: + MAKE_STREAM_REDIS_API = "MKSTREAM" + ENTRIES_READ_REDIS_API = "ENTRIESREAD" + + def __init__( + self, make_stream: bool = False, entries_read_id: Optional[str] = None + ): + """ + Options for creating stream consumer groups. Can be used as an optional argument to `XGROUP CREATE`. + + Args: + make_stream (bool): If set to True and the stream doesn't exist, this creates a new stream with a + length of 0. + entries_read_id: (Optional[str]): An arbitrary ID (that isn't the first ID, last ID, or the zero ID ("0-0")) + used to find out how many entries are between the arbitrary ID (excluding it) and the stream's last + entry. This option can only be specified if you are using Redis version 7.0.0 or above. + """ + self.make_stream = make_stream + self.entries_read_id = entries_read_id + + def to_args(self) -> List[str]: + """ + Returns the options as a list of string arguments to be used in the `XGROUP CREATE` command. + + Returns: + List[str]: The options as a list of arguments for the `XGROUP CREATE` command. + """ + args = [] + if self.make_stream is True: + args.append(self.MAKE_STREAM_REDIS_API) + + if self.entries_read_id is not None: + args.extend([self.ENTRIES_READ_REDIS_API, self.entries_read_id]) + + return args diff --git a/python/python/glide/async_commands/transaction.py b/python/python/glide/async_commands/transaction.py index b995f753a3..904e029bdf 100644 --- a/python/python/glide/async_commands/transaction.py +++ b/python/python/glide/async_commands/transaction.py @@ -44,6 +44,7 @@ ) from glide.async_commands.stream import ( StreamAddOptions, + StreamGroupOptions, StreamRangeBound, StreamReadOptions, StreamTrimOptions, @@ -1955,6 +1956,49 @@ def xread( return self.append_command(RequestType.XRead, args) + def xgroup_create( + self: TTransaction, + key: str, + group_name: str, + group_id: str, + options: Optional[StreamGroupOptions] = None, + ) -> TTransaction: + """ + Creates a new consumer group uniquely identified by `group_name` for the stream stored at `key`. + + See https://valkey.io/commands/xgroup-create for more details. + + Args: + key (str): The key of the stream. + group_name (str): The newly created consumer group name. + group_id (str): The stream entry ID that specifies the last delivered entry in the stream from the new + group’s perspective. The special ID "$" can be used to specify the last entry in the stream. + options (Optional[StreamGroupOptions]): Options for creating the stream group. + + Command response: + TOK: A simple "OK" response. + """ + args = [key, group_name, group_id] + if options is not None: + args.extend(options.to_args()) + + return self.append_command(RequestType.XGroupCreate, args) + + def xgroup_destroy(self: TTransaction, key: str, group_name: str) -> TTransaction: + """ + Destroys the consumer group `group_name` for the stream stored at `key`. + + See https://valkey.io/commands/xgroup-destroy for more details. + + Args: + key (str): The key of the stream. + group_name (str): The consumer group name to delete. + + Command response: + bool: True if the consumer group was destroyed. Otherwise, returns False. + """ + return self.append_command(RequestType.XGroupDestroy, [key, group_name]) + def geoadd( self: TTransaction, key: str, diff --git a/python/python/tests/test_async_client.py b/python/python/tests/test_async_client.py index 24407e4592..5c8e9def92 100644 --- a/python/python/tests/test_async_client.py +++ b/python/python/tests/test_async_client.py @@ -61,6 +61,7 @@ MaxId, MinId, StreamAddOptions, + StreamGroupOptions, StreamReadOptions, TrimByMaxLen, TrimByMinId, @@ -5072,6 +5073,86 @@ async def endless_xread_call(): with pytest.raises(RequestError): await redis_client.xread({key1: stream_id1, string_key: stream_id1}) + @pytest.mark.parametrize("cluster_mode", [True, False]) + @pytest.mark.parametrize("protocol", [ProtocolVersion.RESP2, ProtocolVersion.RESP3]) + async def test_xgroup_create_xgroup_destroy( + self, redis_client: TRedisClient, cluster_mode, protocol, request + ): + key = get_random_string(10) + non_existing_key = get_random_string(10) + string_key = get_random_string(10) + group_name1 = get_random_string(10) + group_name2 = get_random_string(10) + stream_id = "0-1" + + # trying to create a consumer group for a non-existing stream without the "MKSTREAM" arg results in error + with pytest.raises(RequestError): + await redis_client.xgroup_create(non_existing_key, group_name1, stream_id) + + # calling with the "MKSTREAM" arg should create the new stream automatically + assert ( + await redis_client.xgroup_create( + key, group_name1, stream_id, StreamGroupOptions(make_stream=True) + ) + == OK + ) + + # invalid arg - group names must be unique, but group_name1 already exists + with pytest.raises(RequestError): + await redis_client.xgroup_create(key, group_name1, stream_id) + + # invalid stream ID format + with pytest.raises(RequestError): + await redis_client.xgroup_create( + key, group_name2, "invalid_stream_id_format" + ) + + assert await redis_client.xgroup_destroy(key, group_name1) is True + # calling xgroup_destroy again returns False because the group was already destroyed above + assert await redis_client.xgroup_destroy(key, group_name1) is False + + # attempting to destroy a group for a non-existing key should raise an error + with pytest.raises(RequestError): + await redis_client.xgroup_destroy(non_existing_key, group_name1) + + # "ENTRIESREAD" option was added in Redis 7.0.0 + if await check_if_server_version_lt(redis_client, "7.0.0"): + with pytest.raises(RequestError): + await redis_client.xgroup_create( + key, + group_name1, + stream_id, + StreamGroupOptions(entries_read_id="10"), + ) + else: + assert ( + await redis_client.xgroup_create( + key, + group_name1, + stream_id, + StreamGroupOptions(entries_read_id="10"), + ) + == OK + ) + + # invalid entries_read_id - cannot be the zero ("0-0") ID + with pytest.raises(RequestError): + await redis_client.xgroup_create( + key, + group_name2, + stream_id, + StreamGroupOptions(entries_read_id="0-0"), + ) + + # key exists, but it is not a stream + assert await redis_client.set(string_key, "foo") == OK + with pytest.raises(RequestError): + await redis_client.xgroup_create( + string_key, group_name1, stream_id, StreamGroupOptions(make_stream=True) + ) + with pytest.raises(RequestError): + await redis_client.xgroup_destroy(string_key, group_name1) + @pytest.mark.parametrize("cluster_mode", [True, False]) @pytest.mark.parametrize("protocol", [ProtocolVersion.RESP2, ProtocolVersion.RESP3]) async def test_pfadd(self, redis_client: TRedisClient): diff --git a/python/python/tests/test_transaction.py b/python/python/tests/test_transaction.py index 23f5be1d30..ae0dae0f0f 100644 --- a/python/python/tests/test_transaction.py +++ b/python/python/tests/test_transaction.py @@ -37,7 +37,12 @@ ScoreBoundary, ScoreFilter, ) -from glide.async_commands.stream import IdBound, StreamAddOptions, TrimByMinId +from glide.async_commands.stream import ( + IdBound, + StreamAddOptions, + StreamGroupOptions, + TrimByMinId, +) from glide.async_commands.transaction import ( BaseTransaction, ClusterTransaction, @@ -482,6 +487,20 @@ async def transaction_test( args.append({"0-1": [["foo", "bar"]]}) transaction.xtrim(key11, TrimByMinId(threshold="0-2", exact=True)) args.append(1) + + group_name1 = get_random_string(10) + group_name2 = get_random_string(10) + transaction.xgroup_create(key11, group_name1, "0-0") + args.append(OK) + transaction.xgroup_create( + key11, group_name2, "0-0", StreamGroupOptions(make_stream=True) + ) + args.append(OK) + transaction.xgroup_destroy(key11, group_name1) + args.append(True) + transaction.xgroup_destroy(key11, group_name2) + args.append(True) + transaction.xdel(key11, ["0-2", "0-3"]) args.append(1)