Skip to content

Commit

Permalink
Python: add XGROUP CREATE and XGROUP DESTROY commands (#1646)
Browse files Browse the repository at this point in the history
  • Loading branch information
aaron-congo authored Jun 25, 2024
1 parent f399dad commit 51ffa3b
Show file tree
Hide file tree
Showing 7 changed files with 245 additions and 1 deletion.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
* Python: Added COPY command ([#1626](https://github.com/aws/glide-for-redis/pull/1626))
* Python: Added XREVRANGE command ([#1625](https://github.com/aws/glide-for-redis/pull/1625))
* Python: Added XREAD command ([#1644](https://github.com/aws/glide-for-redis/pull/1644))
* Python: Added XGROUP CREATE and XGROUP DESTROY commands ([#1646](https://github.com/aws/glide-for-redis/pull/1646))

### Breaking Changes
* Node: Update XREAD to return a Map of Map ([#1494](https://github.com/aws/glide-for-redis/pull/1494))
Expand Down
2 changes: 2 additions & 0 deletions python/python/glide/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
MaxId,
MinId,
StreamAddOptions,
StreamGroupOptions,
StreamRangeBound,
StreamReadOptions,
StreamTrimOptions,
Expand Down Expand Up @@ -159,6 +160,7 @@
"MaxId",
"MinId",
"StreamAddOptions",
"StreamGroupOptions",
"StreamRangeBound",
"StreamReadOptions",
"StreamTrimOptions",
Expand Down
60 changes: 60 additions & 0 deletions python/python/glide/async_commands/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
)
from glide.async_commands.stream import (
StreamAddOptions,
StreamGroupOptions,
StreamRangeBound,
StreamReadOptions,
StreamTrimOptions,
Expand Down Expand Up @@ -2802,6 +2803,65 @@ async def xread(
await self._execute_command(RequestType.XRead, args),
)

async def xgroup_create(
self,
key: str,
group_name: str,
group_id: str,
options: Optional[StreamGroupOptions] = None,
) -> TOK:
"""
Creates a new consumer group uniquely identified by `group_name` for the stream stored at `key`.
See https://valkey.io/commands/xgroup-create for more details.
Args:
key (str): The key of the stream.
group_name (str): The newly created consumer group name.
group_id (str): The stream entry ID that specifies the last delivered entry in the stream from the new
group’s perspective. The special ID "$" can be used to specify the last entry in the stream.
options (Optional[StreamGroupOptions]): Options for creating the stream group.
Returns:
TOK: A simple "OK" response.
Examples:
>>> await client.xgroup_create("mystream", "mygroup", "$", StreamGroupOptions(make_stream=True))
OK
# Created the consumer group "mygroup" for the stream "mystream", which will track entries created after
# the most recent entry. The stream was created with length 0 if it did not already exist.
"""
args = [key, group_name, group_id]
if options is not None:
args.extend(options.to_args())

return cast(
TOK,
await self._execute_command(RequestType.XGroupCreate, args),
)

async def xgroup_destroy(self, key: str, group_name: str) -> bool:
"""
Destroys the consumer group `group_name` for the stream stored at `key`.
See https://valkey.io/commands/xgroup-destroy for more details.
Args:
key (str): The key of the stream.
group_name (str): The consumer group name to delete.
Returns:
bool: True if the consumer group was destroyed. Otherwise, returns False.
Examples:
>>> await client.xgroup_destroy("mystream", "mygroup")
True # The consumer group "mygroup" for stream "mystream" was destroyed.
"""
return cast(
bool,
await self._execute_command(RequestType.XGroupDestroy, [key, group_name]),
)

async def geoadd(
self,
key: str,
Expand Down
37 changes: 37 additions & 0 deletions python/python/glide/async_commands/stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -265,3 +265,40 @@ def to_args(self) -> List[str]:
args.extend([self.READ_COUNT_REDIS_API, str(self.count)])

return args


class StreamGroupOptions:
MAKE_STREAM_REDIS_API = "MKSTREAM"
ENTRIES_READ_REDIS_API = "ENTRIESREAD"

def __init__(
self, make_stream: bool = False, entries_read_id: Optional[str] = None
):
"""
Options for creating stream consumer groups. Can be used as an optional argument to `XGROUP CREATE`.
Args:
make_stream (bool): If set to True and the stream doesn't exist, this creates a new stream with a
length of 0.
entries_read_id: (Optional[str]): An arbitrary ID (that isn't the first ID, last ID, or the zero ID ("0-0"))
used to find out how many entries are between the arbitrary ID (excluding it) and the stream's last
entry. This option can only be specified if you are using Redis version 7.0.0 or above.
"""
self.make_stream = make_stream
self.entries_read_id = entries_read_id

def to_args(self) -> List[str]:
"""
Returns the options as a list of string arguments to be used in the `XGROUP CREATE` command.
Returns:
List[str]: The options as a list of arguments for the `XGROUP CREATE` command.
"""
args = []
if self.make_stream is True:
args.append(self.MAKE_STREAM_REDIS_API)

if self.entries_read_id is not None:
args.extend([self.ENTRIES_READ_REDIS_API, self.entries_read_id])

return args
44 changes: 44 additions & 0 deletions python/python/glide/async_commands/transaction.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
)
from glide.async_commands.stream import (
StreamAddOptions,
StreamGroupOptions,
StreamRangeBound,
StreamReadOptions,
StreamTrimOptions,
Expand Down Expand Up @@ -1955,6 +1956,49 @@ def xread(

return self.append_command(RequestType.XRead, args)

def xgroup_create(
self: TTransaction,
key: str,
group_name: str,
group_id: str,
options: Optional[StreamGroupOptions] = None,
) -> TTransaction:
"""
Creates a new consumer group uniquely identified by `group_name` for the stream stored at `key`.
See https://valkey.io/commands/xgroup-create for more details.
Args:
key (str): The key of the stream.
group_name (str): The newly created consumer group name.
group_id (str): The stream entry ID that specifies the last delivered entry in the stream from the new
group’s perspective. The special ID "$" can be used to specify the last entry in the stream.
options (Optional[StreamGroupOptions]): Options for creating the stream group.
Command response:
TOK: A simple "OK" response.
"""
args = [key, group_name, group_id]
if options is not None:
args.extend(options.to_args())

return self.append_command(RequestType.XGroupCreate, args)

def xgroup_destroy(self: TTransaction, key: str, group_name: str) -> TTransaction:
"""
Destroys the consumer group `group_name` for the stream stored at `key`.
See https://valkey.io/commands/xgroup-destroy for more details.
Args:
key (str): The key of the stream.
group_name (str): The consumer group name to delete.
Command response:
bool: True if the consumer group was destroyed. Otherwise, returns False.
"""
return self.append_command(RequestType.XGroupDestroy, [key, group_name])

def geoadd(
self: TTransaction,
key: str,
Expand Down
81 changes: 81 additions & 0 deletions python/python/tests/test_async_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@
MaxId,
MinId,
StreamAddOptions,
StreamGroupOptions,
StreamReadOptions,
TrimByMaxLen,
TrimByMinId,
Expand Down Expand Up @@ -5072,6 +5073,86 @@ async def endless_xread_call():
with pytest.raises(RequestError):
await redis_client.xread({key1: stream_id1, string_key: stream_id1})

@pytest.mark.parametrize("cluster_mode", [True, False])
@pytest.mark.parametrize("protocol", [ProtocolVersion.RESP2, ProtocolVersion.RESP3])
async def test_xgroup_create_xgroup_destroy(
self, redis_client: TRedisClient, cluster_mode, protocol, request
):
key = get_random_string(10)
non_existing_key = get_random_string(10)
string_key = get_random_string(10)
group_name1 = get_random_string(10)
group_name2 = get_random_string(10)
stream_id = "0-1"

# trying to create a consumer group for a non-existing stream without the "MKSTREAM" arg results in error
with pytest.raises(RequestError):
await redis_client.xgroup_create(non_existing_key, group_name1, stream_id)

# calling with the "MKSTREAM" arg should create the new stream automatically
assert (
await redis_client.xgroup_create(
key, group_name1, stream_id, StreamGroupOptions(make_stream=True)
)
== OK
)

# invalid arg - group names must be unique, but group_name1 already exists
with pytest.raises(RequestError):
await redis_client.xgroup_create(key, group_name1, stream_id)

# invalid stream ID format
with pytest.raises(RequestError):
await redis_client.xgroup_create(
key, group_name2, "invalid_stream_id_format"
)

assert await redis_client.xgroup_destroy(key, group_name1) is True
# calling xgroup_destroy again returns False because the group was already destroyed above
assert await redis_client.xgroup_destroy(key, group_name1) is False

# attempting to destroy a group for a non-existing key should raise an error
with pytest.raises(RequestError):
await redis_client.xgroup_destroy(non_existing_key, group_name1)

# "ENTRIESREAD" option was added in Redis 7.0.0
if await check_if_server_version_lt(redis_client, "7.0.0"):
with pytest.raises(RequestError):
await redis_client.xgroup_create(
key,
group_name1,
stream_id,
StreamGroupOptions(entries_read_id="10"),
)
else:
assert (
await redis_client.xgroup_create(
key,
group_name1,
stream_id,
StreamGroupOptions(entries_read_id="10"),
)
== OK
)

# invalid entries_read_id - cannot be the zero ("0-0") ID
with pytest.raises(RequestError):
await redis_client.xgroup_create(
key,
group_name2,
stream_id,
StreamGroupOptions(entries_read_id="0-0"),
)

# key exists, but it is not a stream
assert await redis_client.set(string_key, "foo") == OK
with pytest.raises(RequestError):
await redis_client.xgroup_create(
string_key, group_name1, stream_id, StreamGroupOptions(make_stream=True)
)
with pytest.raises(RequestError):
await redis_client.xgroup_destroy(string_key, group_name1)

@pytest.mark.parametrize("cluster_mode", [True, False])
@pytest.mark.parametrize("protocol", [ProtocolVersion.RESP2, ProtocolVersion.RESP3])
async def test_pfadd(self, redis_client: TRedisClient):
Expand Down
21 changes: 20 additions & 1 deletion python/python/tests/test_transaction.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,12 @@
ScoreBoundary,
ScoreFilter,
)
from glide.async_commands.stream import IdBound, StreamAddOptions, TrimByMinId
from glide.async_commands.stream import (
IdBound,
StreamAddOptions,
StreamGroupOptions,
TrimByMinId,
)
from glide.async_commands.transaction import (
BaseTransaction,
ClusterTransaction,
Expand Down Expand Up @@ -482,6 +487,20 @@ async def transaction_test(
args.append({"0-1": [["foo", "bar"]]})
transaction.xtrim(key11, TrimByMinId(threshold="0-2", exact=True))
args.append(1)

group_name1 = get_random_string(10)
group_name2 = get_random_string(10)
transaction.xgroup_create(key11, group_name1, "0-0")
args.append(OK)
transaction.xgroup_create(
key11, group_name2, "0-0", StreamGroupOptions(make_stream=True)
)
args.append(OK)
transaction.xgroup_destroy(key11, group_name1)
args.append(True)
transaction.xgroup_destroy(key11, group_name2)
args.append(True)

transaction.xdel(key11, ["0-2", "0-3"])
args.append(1)

Expand Down

0 comments on commit 51ffa3b

Please sign in to comment.