diff --git a/CHANGELOG.md b/CHANGELOG.md index 365fa9f677..3e0ef8ea07 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -19,6 +19,7 @@ * Python: Added PFADD command ([#1315](https://github.com/aws/glide-for-redis/pull/1315)) * Python: Added ZMSCORE command ([#1357](https://github.com/aws/glide-for-redis/pull/1357)) * Python: Added HRANDFIELD command ([#1334](https://github.com/aws/glide-for-redis/pull/1334)) +* Python: Added XADD, XTRIM commands ([#1320](https://github.com/aws/glide-for-redis/pull/1320)) #### Fixes * Python: Fix typing error "‘type’ object is not subscriptable" ([#1203](https://github.com/aws/glide-for-redis/pull/1203)) diff --git a/python/python/glide/__init__.py b/python/python/glide/__init__.py index b9c468d13f..6c65903bfb 100644 --- a/python/python/glide/__init__.py +++ b/python/python/glide/__init__.py @@ -9,6 +9,10 @@ GeoUnit, InfoSection, InsertPosition, + StreamAddOptions, + StreamTrimOptions, + TrimByMaxLen, + TrimByMinId, UpdateOptions, ) from glide.async_commands.redis_modules import json @@ -94,6 +98,10 @@ "RangeByIndex", "RangeByLex", "RangeByScore", + "StreamAddOptions", + "StreamTrimOptions", + "TrimByMaxLen", + "TrimByMinId", "UpdateOptions", # Logger "Logger", diff --git a/python/python/glide/async_commands/core.py b/python/python/glide/async_commands/core.py index f6dc886fc2..e8667c0696 100644 --- a/python/python/glide/async_commands/core.py +++ b/python/python/glide/async_commands/core.py @@ -1,5 +1,5 @@ # Copyright GLIDE-for-Redis Project Contributors - SPDX Identifier: Apache-2.0 - +from abc import ABC, abstractmethod from datetime import datetime, timedelta from enum import Enum from typing import ( @@ -149,6 +149,134 @@ def __init__(self, longitude: float, latitude: float): self.latitude = latitude +class StreamTrimOptions(ABC): + """ + Abstract base class for stream trim options. + """ + + @abstractmethod + def __init__( + self, + exact: bool, + threshold: Union[str, int], + method: str, + limit: Optional[int] = None, + ): + """ + Initialize stream trim options. + + Args: + exact (bool): If `true`, the stream will be trimmed exactly. + Otherwise the stream will be trimmed in a near-exact manner, which is more efficient. + threshold (Union[str, int]): Threshold for trimming. + method (str): Method for trimming (e.g., MINID, MAXLEN). + limit (Optional[int]): Max number of entries to be trimmed. Defaults to None. + Note: If `exact` is set to `True`, `limit` cannot be specified. + """ + if exact and limit: + raise ValueError( + "If `exact` is set to `True`, `limit` cannot be specified." + ) + self.exact = exact + self.threshold = threshold + self.method = method + self.limit = limit + + def to_args(self) -> List[str]: + """ + Convert options to arguments for Redis command. + + Returns: + List[str]: List of arguments for Redis command. + """ + option_args = [ + self.method, + "=" if self.exact else "~", + str(self.threshold), + ] + if self.limit is not None: + option_args.extend(["LIMIT", str(self.limit)]) + return option_args + + +class TrimByMinId(StreamTrimOptions): + """ + Stream trim option to trim by minimum ID. + """ + + def __init__(self, exact: bool, threshold: str, limit: Optional[int] = None): + """ + Initialize trim option by minimum ID. + + Args: + exact (bool): If `true`, the stream will be trimmed exactly. + Otherwise the stream will be trimmed in a near-exact manner, which is more efficient. + threshold (str): Threshold for trimming by minimum ID. + limit (Optional[int]): Max number of entries to be trimmed. Defaults to None. + Note: If `exact` is set to `True`, `limit` cannot be specified. + """ + super().__init__(exact, threshold, "MINID", limit) + + +class TrimByMaxLen(StreamTrimOptions): + """ + Stream trim option to trim by maximum length. + """ + + def __init__(self, exact: bool, threshold: int, limit: Optional[int] = None): + """ + Initialize trim option by maximum length. + + Args: + exact (bool): If `true`, the stream will be trimmed exactly. + Otherwise the stream will be trimmed in a near-exact manner, which is more efficient. + threshold (int): Threshold for trimming by maximum length. + limit (Optional[int]): Max number of entries to be trimmed. Defaults to None. + Note: If `exact` is set to `True`, `limit` cannot be specified. + """ + super().__init__(exact, threshold, "MAXLEN", limit) + + +class StreamAddOptions: + """ + Options for adding entries to a stream. + """ + + def __init__( + self, + id: Optional[str] = None, + make_stream: bool = True, + trim: Optional[StreamTrimOptions] = None, + ): + """ + Initialize stream add options. + + Args: + id (Optional[str]): ID for the new entry. If set, the new entry will be added with this ID. If not specified, '*' is used. + make_stream (bool, optional): If set to False, a new stream won't be created if no stream matches the given key. + trim (Optional[StreamTrimOptions]): If set, the add operation will also trim the older entries in the stream. See `StreamTrimOptions`. + """ + self.id = id + self.make_stream = make_stream + self.trim = trim + + def to_args(self) -> List[str]: + """ + Convert options to arguments for Redis command. + + Returns: + List[str]: List of arguments for Redis command. + """ + option_args = [] + if not self.make_stream: + option_args.append("NOMKSTREAM") + if self.trim: + option_args.extend(self.trim.to_args()) + option_args.append(self.id if self.id else "*") + + return option_args + + class GeoUnit(Enum): """ Enumeration representing distance units options for the `GEODIST` command. @@ -1675,6 +1803,70 @@ async def type(self, key: str) -> str: """ return cast(str, await self._execute_command(RequestType.Type, [key])) + async def xadd( + self, + key: str, + values: List[Tuple[str, str]], + options: Optional[StreamAddOptions] = None, + ) -> Optional[str]: + """ + Adds an entry to the specified stream stored at `key`. If the `key` doesn't exist, the stream is created. + + See https://valkey.io/commands/xadd for more details. + + Args: + key (str): The key of the stream. + values (List[Tuple[str, str]]): Field-value pairs to be added to the entry. + options (Optional[StreamAddOptions]): Additional options for adding entries to the stream. Default to None. sSee `StreamAddOptions`. + + Returns: + str: The id of the added entry, or None if `options.make_stream` is set to False and no stream with the matching `key` exists. + + Example: + >>> await client.xadd("mystream", [("field", "value"), ("field2", "value2")]) + "1615957011958-0" # Example stream entry ID. + >>> await client.xadd("non_existing_stream", [(field, "foo1"), (field2, "bar1")], StreamAddOptions(id="0-1", make_stream=False)) + None # The key doesn't exist, therefore, None is returned. + >>> await client.xadd("non_existing_stream", [(field, "foo1"), (field2, "bar1")], StreamAddOptions(id="0-1")) + "0-1" # Returns the stream id. + """ + args = [key] + if options: + args.extend(options.to_args()) + else: + args.append("*") + args.extend([field for pair in values for field in pair]) + + return cast(Optional[str], await self._execute_command(RequestType.XAdd, args)) + + async def xtrim( + self, + key: str, + options: StreamTrimOptions, + ) -> int: + """ + Trims the stream stored at `key` by evicting older entries. + + See https://valkey.io/commands/xtrim for more details. + + Args: + key (str): The key of the stream. + options (StreamTrimOptions): Options detailing how to trim the stream. See `StreamTrimOptions`. + + Returns: + int: TThe number of entries deleted from the stream. If `key` doesn't exist, 0 is returned. + + Example: + >>> await client.xadd("mystream", [("field", "value"), ("field2", "value2")], StreamAddOptions(id="0-1")) + >>> await client.xtrim("mystream", TrimByMinId(exact=True, threshold="0-2"))) + 1 # One entry was deleted from the stream. + """ + args = [key] + if options: + args.extend(options.to_args()) + + return cast(int, await self._execute_command(RequestType.XTrim, args)) + async def geoadd( self, key: str, diff --git a/python/python/glide/async_commands/transaction.py b/python/python/glide/async_commands/transaction.py index a90527e586..e91a52cce4 100644 --- a/python/python/glide/async_commands/transaction.py +++ b/python/python/glide/async_commands/transaction.py @@ -11,6 +11,8 @@ GeoUnit, InfoSection, InsertPosition, + StreamAddOptions, + StreamTrimOptions, UpdateOptions, ) from glide.async_commands.sorted_set import ( @@ -1248,6 +1250,55 @@ def type(self: TTransaction, key: str) -> TTransaction: """ return self.append_command(RequestType.Type, [key]) + def xadd( + self: TTransaction, + key: str, + values: List[Tuple[str, str]], + options: StreamAddOptions = StreamAddOptions(), + ) -> TTransaction: + """ + Adds an entry to the specified stream stored at `key`. If the `key` doesn't exist, the stream is created. + + See https://valkey.io/commands/xadd for more details. + + Args: + key (str): The key of the stream. + values (List[Tuple[str, str]]): Field-value pairs to be added to the entry. + options (Optional[StreamAddOptions]): Additional options for adding entries to the stream. Default to None. sSee `StreamAddOptions`. + + Commands response: + str: The id of the added entry, or None if `options.make_stream` is set to False and no stream with the matching `key` exists. + """ + args = [key] + if options: + args.extend(options.to_args()) + args.extend([field for pair in values for field in pair]) + + return self.append_command(RequestType.XAdd, args) + + def xtrim( + self: TTransaction, + key: str, + options: StreamTrimOptions, + ) -> TTransaction: + """ + Trims the stream stored at `key` by evicting older entries. + + See https://valkey.io/commands/xtrim for more details. + + Args: + key (str): The key of the stream. + options (StreamTrimOptions): Options detailing how to trim the stream. See `StreamTrimOptions`. + + Commands response: + int: TThe number of entries deleted from the stream. If `key` doesn't exist, 0 is returned. + """ + args = [key] + if options: + args.extend(options.to_args()) + + return self.append_command(RequestType.XTrim, args) + def geoadd( self: TTransaction, key: str, diff --git a/python/python/tests/test_async_client.py b/python/python/tests/test_async_client.py index eee211a238..9d3efdf43c 100644 --- a/python/python/tests/test_async_client.py +++ b/python/python/tests/test_async_client.py @@ -22,6 +22,9 @@ InfBound, InfoSection, InsertPosition, + StreamAddOptions, + TrimByMaxLen, + TrimByMinId, UpdateOptions, ) from glide.async_commands.sorted_set import ( @@ -2064,7 +2067,7 @@ async def test_type(self, redis_client: TRedisClient): assert (await redis_client.type(key)).lower() == "hash" assert await redis_client.delete([key]) == 1 - await redis_client.custom_command(["XADD", key, "*", "field", "value"]) + await redis_client.xadd(key, [("field", "value")]) assert await redis_client.type(key) == "stream" assert await redis_client.delete([key]) == 1 @@ -2123,6 +2126,57 @@ async def test_append(self, redis_client: TRedisClient): assert await redis_client.append(key, value) == 10 assert await redis_client.get(key) == value * 2 + @pytest.mark.parametrize("cluster_mode", [True, False]) + @pytest.mark.parametrize("protocol", [ProtocolVersion.RESP2, ProtocolVersion.RESP3]) + async def test_xadd_xtrim(self, redis_client: TRedisClient): + key = get_random_string(10) + field, field2 = get_random_string(10), get_random_string(10) + + assert ( + await redis_client.xadd( + key, + [(field, "foo"), (field2, "bar")], + StreamAddOptions(make_stream=False), + ) + is None + ) + + assert ( + await redis_client.xadd( + key, [(field, "foo1"), (field2, "bar1")], StreamAddOptions(id="0-1") + ) + == "0-1" + ) + + assert ( + await redis_client.xadd(key, [(field, "foo2"), (field2, "bar2")]) + ) is not None + assert await redis_client.custom_command(["XLEN", key]) == 2 + + # This will trim the first entry. + id = await redis_client.xadd( + key, + [(field, "foo3"), (field2, "bar3")], + StreamAddOptions(trim=TrimByMaxLen(exact=True, threshold=2)), + ) + + assert id is not None + assert await redis_client.custom_command(["XLEN", key]) == 2 + + # This will trim the 2nd entry. + assert ( + await redis_client.xadd( + key, + [(field, "foo4"), (field2, "bar4")], + StreamAddOptions(trim=TrimByMinId(exact=True, threshold=str(id))), + ) + is not None + ) + assert await redis_client.custom_command(["XLEN", key]) == 2 + + assert await redis_client.xtrim(key, TrimByMaxLen(threshold=1, exact=True)) == 1 + assert await redis_client.custom_command(["XLEN", key]) == 1 + @pytest.mark.parametrize("cluster_mode", [True, False]) @pytest.mark.parametrize("protocol", [ProtocolVersion.RESP2, ProtocolVersion.RESP3]) async def test_pfadd(self, redis_client: TRedisClient): diff --git a/python/python/tests/test_transaction.py b/python/python/tests/test_transaction.py index 24aba819ff..bf6227c5bf 100644 --- a/python/python/tests/test_transaction.py +++ b/python/python/tests/test_transaction.py @@ -5,7 +5,12 @@ import pytest from glide import RequestError -from glide.async_commands.core import GeospatialData, InsertPosition +from glide.async_commands.core import ( + GeospatialData, + InsertPosition, + StreamAddOptions, + TrimByMinId, +) from glide.async_commands.sorted_set import ( InfBound, LexBoundary, @@ -39,6 +44,7 @@ async def transaction_test( key8 = "{{{}}}:{}".format(keyslot, get_random_string(3)) key9 = "{{{}}}:{}".format(keyslot, get_random_string(3)) key10 = "{{{}}}:{}".format(keyslot, get_random_string(3)) # hyper log log + key11 = "{{{}}}:{}".format(keyslot, get_random_string(3)) # streams value = datetime.now(timezone.utc).strftime("%m/%d/%Y, %H:%M:%S") value2 = get_random_string(5) @@ -246,6 +252,12 @@ async def transaction_test( ] ) + transaction.xadd(key11, [("foo", "bar")], StreamAddOptions(id="0-1")) + args.append("0-1") + transaction.xadd(key11, [("foo", "bar")], StreamAddOptions(id="0-2")) + args.append("0-2") + transaction.xtrim(key11, TrimByMinId(threshold="0-2", exact=True)) + args.append(1) return args