Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Python: add XGROUP CREATE and XGROUP DESTROY commands #1646

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
* Python: Added COPY command ([#1626](https://github.com/aws/glide-for-redis/pull/1626))
* Python: Added XREVRANGE command ([#1625](https://github.com/aws/glide-for-redis/pull/1625))
* Python: Added XREAD command ([#1644](https://github.com/aws/glide-for-redis/pull/1644))
* Python: Added XGROUP CREATE and XGROUP DESTROY commands ([#1646](https://github.com/aws/glide-for-redis/pull/1646))

### Breaking Changes
* Node: Update XREAD to return a Map of Map ([#1494](https://github.com/aws/glide-for-redis/pull/1494))
Expand Down
2 changes: 2 additions & 0 deletions python/python/glide/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
MaxId,
MinId,
StreamAddOptions,
StreamGroupOptions,
StreamRangeBound,
StreamReadOptions,
StreamTrimOptions,
Expand Down Expand Up @@ -159,6 +160,7 @@
"MaxId",
"MinId",
"StreamAddOptions",
"StreamGroupOptions",
"StreamRangeBound",
"StreamReadOptions",
"StreamTrimOptions",
Expand Down
60 changes: 60 additions & 0 deletions python/python/glide/async_commands/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
)
from glide.async_commands.stream import (
StreamAddOptions,
StreamGroupOptions,
StreamRangeBound,
StreamReadOptions,
StreamTrimOptions,
Expand Down Expand Up @@ -2802,6 +2803,65 @@ async def xread(
await self._execute_command(RequestType.XRead, args),
)

async def xgroup_create(
self,
key: str,
group_name: str,
group_id: str,
options: Optional[StreamGroupOptions] = None,
) -> TOK:
"""
Creates a new consumer group uniquely identified by `group_name` for the stream stored at `key`.

See https://valkey.io/commands/xgroup-create for more details.

Args:
key (str): The key of the stream.
group_name (str): The newly created consumer group name.
group_id (str): The stream entry ID that specifies the last delivered entry in the stream from the new
group’s perspective. The special ID "$" can be used to specify the last entry in the stream.
options (Optional[StreamGroupOptions]): Options for creating the stream group.

Returns:
TOK: A simple "OK" response.

Examples:
>>> await client.xgroup_create("mystream", "mygroup", "$", StreamGroupOptions(make_stream=True))
OK
# Created the consumer group "mygroup" for the stream "mystream", which will track entries created after
# the most recent entry. The stream was created with length 0 if it did not already exist.
"""
args = [key, group_name, group_id]
if options is not None:
args.extend(options.to_args())

return cast(
TOK,
await self._execute_command(RequestType.XGroupCreate, args),
)

async def xgroup_destroy(self, key: str, group_name: str) -> bool:
"""
Destroys the consumer group `group_name` for the stream stored at `key`.

See https://valkey.io/commands/xgroup-destroy for more details.

Args:
key (str): The key of the stream.
group_name (str): The consumer group name to delete.

Returns:
bool: True if the consumer group was destroyed. Otherwise, returns False.

Examples:
>>> await client.xgroup_destroy("mystream", "mygroup")
True # The consumer group "mygroup" for stream "mystream" was destroyed.
"""
return cast(
bool,
await self._execute_command(RequestType.XGroupDestroy, [key, group_name]),
)

async def geoadd(
self,
key: str,
Expand Down
37 changes: 37 additions & 0 deletions python/python/glide/async_commands/stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -265,3 +265,40 @@ def to_args(self) -> List[str]:
args.extend([self.READ_COUNT_REDIS_API, str(self.count)])

return args


class StreamGroupOptions:
MAKE_STREAM_REDIS_API = "MKSTREAM"
ENTRIES_READ_REDIS_API = "ENTRIESREAD"

def __init__(
self, make_stream: bool = False, entries_read_id: Optional[str] = None
):
"""
Options for creating stream consumer groups. Can be used as an optional argument to `XGROUP CREATE`.

Args:
make_stream (bool): If set to True and the stream doesn't exist, this creates a new stream with a
length of 0.
entries_read_id: (Optional[str]): An arbitrary ID (that isn't the first ID, last ID, or the zero ID ("0-0"))
used to find out how many entries are between the arbitrary ID (excluding it) and the stream's last
entry. This option can only be specified if you are using Redis version 7.0.0 or above.
"""
self.make_stream = make_stream
self.entries_read_id = entries_read_id

def to_args(self) -> List[str]:
"""
Returns the options as a list of string arguments to be used in the `XGROUP CREATE` command.

Returns:
List[str]: The options as a list of arguments for the `XGROUP CREATE` command.
"""
args = []
if self.make_stream is True:
args.append(self.MAKE_STREAM_REDIS_API)

if self.entries_read_id is not None:
args.extend([self.ENTRIES_READ_REDIS_API, self.entries_read_id])

return args
44 changes: 44 additions & 0 deletions python/python/glide/async_commands/transaction.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
)
from glide.async_commands.stream import (
StreamAddOptions,
StreamGroupOptions,
StreamRangeBound,
StreamReadOptions,
StreamTrimOptions,
Expand Down Expand Up @@ -1955,6 +1956,49 @@ def xread(

return self.append_command(RequestType.XRead, args)

def xgroup_create(
self: TTransaction,
key: str,
group_name: str,
group_id: str,
options: Optional[StreamGroupOptions] = None,
) -> TTransaction:
"""
Creates a new consumer group uniquely identified by `group_name` for the stream stored at `key`.

See https://valkey.io/commands/xgroup-create for more details.

Args:
key (str): The key of the stream.
group_name (str): The newly created consumer group name.
group_id (str): The stream entry ID that specifies the last delivered entry in the stream from the new
group’s perspective. The special ID "$" can be used to specify the last entry in the stream.
options (Optional[StreamGroupOptions]): Options for creating the stream group.

Command response:
TOK: A simple "OK" response.
"""
args = [key, group_name, group_id]
if options is not None:
args.extend(options.to_args())

return self.append_command(RequestType.XGroupCreate, args)

def xgroup_destroy(self: TTransaction, key: str, group_name: str) -> TTransaction:
"""
Destroys the consumer group `group_name` for the stream stored at `key`.

See https://valkey.io/commands/xgroup-destroy for more details.

Args:
key (str): The key of the stream.
group_name (str): The consumer group name to delete.

Command response:
bool: True if the consumer group was destroyed. Otherwise, returns False.
"""
return self.append_command(RequestType.XGroupDestroy, [key, group_name])

def geoadd(
self: TTransaction,
key: str,
Expand Down
81 changes: 81 additions & 0 deletions python/python/tests/test_async_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@
MaxId,
MinId,
StreamAddOptions,
StreamGroupOptions,
StreamReadOptions,
TrimByMaxLen,
TrimByMinId,
Expand Down Expand Up @@ -5072,6 +5073,86 @@ async def endless_xread_call():
with pytest.raises(RequestError):
await redis_client.xread({key1: stream_id1, string_key: stream_id1})

@pytest.mark.parametrize("cluster_mode", [True, False])
@pytest.mark.parametrize("protocol", [ProtocolVersion.RESP2, ProtocolVersion.RESP3])
async def test_xgroup_create_xgroup_destroy(
self, redis_client: TRedisClient, cluster_mode, protocol, request
):
key = get_random_string(10)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In XREAD, you use key1 = f"{{testKey}}:1-{get_random_string(10)}".

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do find prefixing the keys with something unique to the test is helpful for debugging.
Same for group_name, etc.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, that's because XREAD is a multi-key command, so the keys need to have the same hash tag in order to be mapped to the same hash slot. XGROUP CREATE/DESTROY are single-key commands so they do not have this requirement.

non_existing_key = get_random_string(10)
string_key = get_random_string(10)
group_name1 = get_random_string(10)
group_name2 = get_random_string(10)
stream_id = "0-1"

# trying to create a consumer group for a non-existing stream without the "MKSTREAM" arg results in error
with pytest.raises(RequestError):
await redis_client.xgroup_create(non_existing_key, group_name1, stream_id)

# calling with the "MKSTREAM" arg should create the new stream automatically
assert (
await redis_client.xgroup_create(
key, group_name1, stream_id, StreamGroupOptions(make_stream=True)
)
== OK
)

# invalid arg - group names must be unique, but group_name1 already exists
with pytest.raises(RequestError):
await redis_client.xgroup_create(key, group_name1, stream_id)

# invalid stream ID format
with pytest.raises(RequestError):
await redis_client.xgroup_create(
key, group_name2, "invalid_stream_id_format"
)

assert await redis_client.xgroup_destroy(key, group_name1) is True
# calling xgroup_destroy again returns False because the group was already destroyed above
assert await redis_client.xgroup_destroy(key, group_name1) is False

# attempting to destroy a group for a non-existing key should raise an error
with pytest.raises(RequestError):
await redis_client.xgroup_destroy(non_existing_key, group_name1)

# "ENTRIESREAD" option was added in Redis 7.0.0
if await check_if_server_version_lt(redis_client, "7.0.0"):
with pytest.raises(RequestError):
await redis_client.xgroup_create(
key,
group_name1,
stream_id,
StreamGroupOptions(entries_read_id="10"),
)
else:
assert (
await redis_client.xgroup_create(
key,
group_name1,
stream_id,
StreamGroupOptions(entries_read_id="10"),
)
== OK
)

# invalid entries_read_id - cannot be the zero ("0-0") ID
with pytest.raises(RequestError):
await redis_client.xgroup_create(
key,
group_name2,
stream_id,
StreamGroupOptions(entries_read_id="0-0"),
)

# key exists, but it is not a stream
assert await redis_client.set(string_key, "foo") == OK
with pytest.raises(RequestError):
await redis_client.xgroup_create(
string_key, group_name1, stream_id, StreamGroupOptions(make_stream=True)
)
with pytest.raises(RequestError):
await redis_client.xgroup_destroy(string_key, group_name1)

@pytest.mark.parametrize("cluster_mode", [True, False])
@pytest.mark.parametrize("protocol", [ProtocolVersion.RESP2, ProtocolVersion.RESP3])
async def test_pfadd(self, redis_client: TRedisClient):
Expand Down
21 changes: 20 additions & 1 deletion python/python/tests/test_transaction.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,12 @@
ScoreBoundary,
ScoreFilter,
)
from glide.async_commands.stream import IdBound, StreamAddOptions, TrimByMinId
from glide.async_commands.stream import (
IdBound,
StreamAddOptions,
StreamGroupOptions,
TrimByMinId,
)
from glide.async_commands.transaction import (
BaseTransaction,
ClusterTransaction,
Expand Down Expand Up @@ -482,6 +487,20 @@ async def transaction_test(
args.append({"0-1": [["foo", "bar"]]})
transaction.xtrim(key11, TrimByMinId(threshold="0-2", exact=True))
args.append(1)

group_name1 = get_random_string(10)
group_name2 = get_random_string(10)
transaction.xgroup_create(key11, group_name1, "0-0")
args.append(OK)
transaction.xgroup_create(
key11, group_name2, "0-0", StreamGroupOptions(make_stream=True)
)
args.append(OK)
transaction.xgroup_destroy(key11, group_name1)
args.append(True)
transaction.xgroup_destroy(key11, group_name2)
args.append(True)

transaction.xdel(key11, ["0-2", "0-3"])
args.append(1)

Expand Down
Loading