Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Python: adds XADD, XTRIM commands #1320

Merged
merged 6 commits into from
May 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
* Python: Added PFADD command ([#1315](https://github.com/aws/glide-for-redis/pull/1315))
* Python: Added ZMSCORE command ([#1357](https://github.com/aws/glide-for-redis/pull/1357))
* Python: Added HRANDFIELD command ([#1334](https://github.com/aws/glide-for-redis/pull/1334))
* Python: Added XADD, XTRIM commands ([#1320](https://github.com/aws/glide-for-redis/pull/1320))

#### Fixes
* Python: Fix typing error "‘type’ object is not subscriptable" ([#1203](https://github.com/aws/glide-for-redis/pull/1203))
Expand Down
8 changes: 8 additions & 0 deletions python/python/glide/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,10 @@
GeoUnit,
InfoSection,
InsertPosition,
StreamAddOptions,
StreamTrimOptions,
TrimByMaxLen,
TrimByMinId,
UpdateOptions,
)
from glide.async_commands.redis_modules import json
Expand Down Expand Up @@ -94,6 +98,10 @@
"RangeByIndex",
"RangeByLex",
"RangeByScore",
"StreamAddOptions",
"StreamTrimOptions",
"TrimByMaxLen",
"TrimByMinId",
"UpdateOptions",
# Logger
"Logger",
Expand Down
194 changes: 193 additions & 1 deletion python/python/glide/async_commands/core.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# Copyright GLIDE-for-Redis Project Contributors - SPDX Identifier: Apache-2.0

from abc import ABC, abstractmethod
from datetime import datetime, timedelta
from enum import Enum
from typing import (
Expand Down Expand Up @@ -149,6 +149,134 @@ def __init__(self, longitude: float, latitude: float):
self.latitude = latitude


class StreamTrimOptions(ABC):
"""
Abstract base class for stream trim options.
"""

@abstractmethod
def __init__(
self,
exact: bool,
threshold: Union[str, int],
method: str,
limit: Optional[int] = None,
):
"""
Initialize stream trim options.

Args:
exact (bool): If `true`, the stream will be trimmed exactly.
Otherwise the stream will be trimmed in a near-exact manner, which is more efficient.
threshold (Union[str, int]): Threshold for trimming.
method (str): Method for trimming (e.g., MINID, MAXLEN).
limit (Optional[int]): Max number of entries to be trimmed. Defaults to None.
Note: If `exact` is set to `True`, `limit` cannot be specified.
"""
if exact and limit:
raise ValueError(
"If `exact` is set to `True`, `limit` cannot be specified."
)
self.exact = exact
self.threshold = threshold
self.method = method
self.limit = limit

def to_args(self) -> List[str]:
"""
Convert options to arguments for Redis command.

Returns:
List[str]: List of arguments for Redis command.
"""
option_args = [
self.method,
"=" if self.exact else "~",
str(self.threshold),
]
if self.limit is not None:
option_args.extend(["LIMIT", str(self.limit)])
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please make LIMIT, MINID and other keywords constants

return option_args


class TrimByMinId(StreamTrimOptions):
"""
Stream trim option to trim by minimum ID.
"""

def __init__(self, exact: bool, threshold: str, limit: Optional[int] = None):
"""
Initialize trim option by minimum ID.

Args:
exact (bool): If `true`, the stream will be trimmed exactly.
Otherwise the stream will be trimmed in a near-exact manner, which is more efficient.
threshold (str): Threshold for trimming by minimum ID.
limit (Optional[int]): Max number of entries to be trimmed. Defaults to None.
Note: If `exact` is set to `True`, `limit` cannot be specified.
"""
super().__init__(exact, threshold, "MINID", limit)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

limit cannot be set when exact is set to true. We should consider a builder that sets either exact OR limit - not both.



class TrimByMaxLen(StreamTrimOptions):
"""
Stream trim option to trim by maximum length.
"""

def __init__(self, exact: bool, threshold: int, limit: Optional[int] = None):
"""
Initialize trim option by maximum length.

Args:
exact (bool): If `true`, the stream will be trimmed exactly.
Otherwise the stream will be trimmed in a near-exact manner, which is more efficient.
threshold (int): Threshold for trimming by maximum length.
limit (Optional[int]): Max number of entries to be trimmed. Defaults to None.
Note: If `exact` is set to `True`, `limit` cannot be specified.
"""
super().__init__(exact, threshold, "MAXLEN", limit)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

limit cannot be set when exact is set to true. We should consider a builder that sets either exact OR limit - not both.



class StreamAddOptions:
"""
Options for adding entries to a stream.
"""

def __init__(
self,
id: Optional[str] = None,
make_stream: bool = True,
trim: Optional[StreamTrimOptions] = None,
):
"""
Initialize stream add options.

Args:
id (Optional[str]): ID for the new entry. If set, the new entry will be added with this ID. If not specified, '*' is used.
make_stream (bool, optional): If set to False, a new stream won't be created if no stream matches the given key.
trim (Optional[StreamTrimOptions]): If set, the add operation will also trim the older entries in the stream. See `StreamTrimOptions`.
"""
self.id = id
self.make_stream = make_stream
self.trim = trim

def to_args(self) -> List[str]:
"""
Convert options to arguments for Redis command.

Returns:
List[str]: List of arguments for Redis command.
"""
option_args = []
if not self.make_stream:
option_args.append("NOMKSTREAM")
if self.trim:
option_args.extend(self.trim.to_args())
option_args.append(self.id if self.id else "*")

return option_args


class GeoUnit(Enum):
"""
Enumeration representing distance units options for the `GEODIST` command.
Expand Down Expand Up @@ -1675,6 +1803,70 @@ async def type(self, key: str) -> str:
"""
return cast(str, await self._execute_command(RequestType.Type, [key]))

async def xadd(
self,
key: str,
values: List[Tuple[str, str]],
options: Optional[StreamAddOptions] = None,
) -> Optional[str]:
"""
Adds an entry to the specified stream stored at `key`. If the `key` doesn't exist, the stream is created.

See https://valkey.io/commands/xadd for more details.

Args:
key (str): The key of the stream.
values (List[Tuple[str, str]]): Field-value pairs to be added to the entry.
options (Optional[StreamAddOptions]): Additional options for adding entries to the stream. Default to None. sSee `StreamAddOptions`.

Returns:
str: The id of the added entry, or None if `options.make_stream` is set to False and no stream with the matching `key` exists.

Example:
>>> await client.xadd("mystream", [("field", "value"), ("field2", "value2")])
"1615957011958-0" # Example stream entry ID.
>>> await client.xadd("non_existing_stream", [(field, "foo1"), (field2, "bar1")], StreamAddOptions(id="0-1", make_stream=False))
None # The key doesn't exist, therefore, None is returned.
>>> await client.xadd("non_existing_stream", [(field, "foo1"), (field2, "bar1")], StreamAddOptions(id="0-1"))
"0-1" # Returns the stream id.
"""
args = [key]
if options:
args.extend(options.to_args())
else:
args.append("*")
args.extend([field for pair in values for field in pair])

return cast(Optional[str], await self._execute_command(RequestType.XAdd, args))

async def xtrim(
self,
key: str,
options: StreamTrimOptions,
) -> int:
"""
Trims the stream stored at `key` by evicting older entries.

See https://valkey.io/commands/xtrim for more details.

Args:
key (str): The key of the stream.
options (StreamTrimOptions): Options detailing how to trim the stream. See `StreamTrimOptions`.

Returns:
int: TThe number of entries deleted from the stream. If `key` doesn't exist, 0 is returned.

Example:
>>> await client.xadd("mystream", [("field", "value"), ("field2", "value2")], StreamAddOptions(id="0-1"))
>>> await client.xtrim("mystream", TrimByMinId(exact=True, threshold="0-2")))
1 # One entry was deleted from the stream.
"""
args = [key]
if options:
args.extend(options.to_args())

return cast(int, await self._execute_command(RequestType.XTrim, args))

async def geoadd(
self,
key: str,
Expand Down
51 changes: 51 additions & 0 deletions python/python/glide/async_commands/transaction.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
GeoUnit,
InfoSection,
InsertPosition,
StreamAddOptions,
StreamTrimOptions,
UpdateOptions,
)
from glide.async_commands.sorted_set import (
Expand Down Expand Up @@ -1248,6 +1250,55 @@ def type(self: TTransaction, key: str) -> TTransaction:
"""
return self.append_command(RequestType.Type, [key])

def xadd(
self: TTransaction,
key: str,
values: List[Tuple[str, str]],
options: StreamAddOptions = StreamAddOptions(),
) -> TTransaction:
"""
Adds an entry to the specified stream stored at `key`. If the `key` doesn't exist, the stream is created.

See https://valkey.io/commands/xadd for more details.

Args:
key (str): The key of the stream.
values (List[Tuple[str, str]]): Field-value pairs to be added to the entry.
options (Optional[StreamAddOptions]): Additional options for adding entries to the stream. Default to None. sSee `StreamAddOptions`.

Commands response:
str: The id of the added entry, or None if `options.make_stream` is set to False and no stream with the matching `key` exists.
"""
args = [key]
if options:
args.extend(options.to_args())
args.extend([field for pair in values for field in pair])

return self.append_command(RequestType.XAdd, args)

def xtrim(
self: TTransaction,
key: str,
options: StreamTrimOptions,
) -> TTransaction:
"""
Trims the stream stored at `key` by evicting older entries.

See https://valkey.io/commands/xtrim for more details.

Args:
key (str): The key of the stream.
options (StreamTrimOptions): Options detailing how to trim the stream. See `StreamTrimOptions`.

Commands response:
int: TThe number of entries deleted from the stream. If `key` doesn't exist, 0 is returned.
shohamazon marked this conversation as resolved.
Show resolved Hide resolved
"""
args = [key]
if options:
args.extend(options.to_args())

return self.append_command(RequestType.XTrim, args)

def geoadd(
self: TTransaction,
key: str,
Expand Down
56 changes: 55 additions & 1 deletion python/python/tests/test_async_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@
InfBound,
InfoSection,
InsertPosition,
StreamAddOptions,
TrimByMaxLen,
TrimByMinId,
UpdateOptions,
)
from glide.async_commands.sorted_set import (
Expand Down Expand Up @@ -2064,7 +2067,7 @@ async def test_type(self, redis_client: TRedisClient):
assert (await redis_client.type(key)).lower() == "hash"
assert await redis_client.delete([key]) == 1

await redis_client.custom_command(["XADD", key, "*", "field", "value"])
await redis_client.xadd(key, [("field", "value")])
assert await redis_client.type(key) == "stream"
assert await redis_client.delete([key]) == 1

Expand Down Expand Up @@ -2123,6 +2126,57 @@ async def test_append(self, redis_client: TRedisClient):
assert await redis_client.append(key, value) == 10
assert await redis_client.get(key) == value * 2

@pytest.mark.parametrize("cluster_mode", [True, False])
@pytest.mark.parametrize("protocol", [ProtocolVersion.RESP2, ProtocolVersion.RESP3])
async def test_xadd_xtrim(self, redis_client: TRedisClient):
key = get_random_string(10)
field, field2 = get_random_string(10), get_random_string(10)

assert (
await redis_client.xadd(
key,
[(field, "foo"), (field2, "bar")],
StreamAddOptions(make_stream=False),
)
is None
)

assert (
await redis_client.xadd(
key, [(field, "foo1"), (field2, "bar1")], StreamAddOptions(id="0-1")
)
== "0-1"
)

assert (
await redis_client.xadd(key, [(field, "foo2"), (field2, "bar2")])
) is not None
assert await redis_client.custom_command(["XLEN", key]) == 2

# This will trim the first entry.
id = await redis_client.xadd(
key,
[(field, "foo3"), (field2, "bar3")],
StreamAddOptions(trim=TrimByMaxLen(exact=True, threshold=2)),
)

assert id is not None
assert await redis_client.custom_command(["XLEN", key]) == 2
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add TODO to replace with XLEN implementation


# This will trim the 2nd entry.
assert (
await redis_client.xadd(
key,
[(field, "foo4"), (field2, "bar4")],
StreamAddOptions(trim=TrimByMinId(exact=True, threshold=str(id))),
)
is not None
)
assert await redis_client.custom_command(["XLEN", key]) == 2
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add a TODO to replace with XLEN implementation


assert await redis_client.xtrim(key, TrimByMaxLen(threshold=1, exact=True)) == 1
assert await redis_client.custom_command(["XLEN", key]) == 1

@pytest.mark.parametrize("cluster_mode", [True, False])
@pytest.mark.parametrize("protocol", [ProtocolVersion.RESP2, ProtocolVersion.RESP3])
async def test_pfadd(self, redis_client: TRedisClient):
Expand Down
Loading
Loading