Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Python: add XREADGROUP command #1679

Merged
merged 3 commits into from
Jun 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
* Python: Added XGROUP CREATE and XGROUP DESTROY commands ([#1646](https://github.com/aws/glide-for-redis/pull/1646))
* Python: Added XGROUP CREATECONSUMER and XGROUP DELCONSUMER commands ([#1658](https://github.com/aws/glide-for-redis/pull/1658))
* Python: Added LOLWUT command ([#1657](https://github.com/aws/glide-for-redis/pull/1657))
* Python: Added XREADGROUP command ([#1679](https://github.com/aws/glide-for-redis/pull/1679))

### Breaking Changes
* Node: Update XREAD to return a Map of Map ([#1494](https://github.com/aws/glide-for-redis/pull/1494))
Expand Down
2 changes: 2 additions & 0 deletions python/python/glide/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
StreamAddOptions,
StreamGroupOptions,
StreamRangeBound,
StreamReadGroupOptions,
StreamReadOptions,
StreamTrimOptions,
TrimByMaxLen,
Expand Down Expand Up @@ -161,6 +162,7 @@
"MinId",
"StreamAddOptions",
"StreamGroupOptions",
"StreamReadGroupOptions",
"StreamRangeBound",
"StreamReadOptions",
"StreamTrimOptions",
Expand Down
52 changes: 52 additions & 0 deletions python/python/glide/async_commands/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
StreamAddOptions,
StreamGroupOptions,
StreamRangeBound,
StreamReadGroupOptions,
StreamReadOptions,
StreamTrimOptions,
)
Expand Down Expand Up @@ -2916,6 +2917,57 @@ async def xgroup_del_consumer(
),
)

async def xreadgroup(
self,
keys_and_ids: Mapping[str, str],
group_name: str,
consumer_name: str,
options: Optional[StreamReadGroupOptions] = None,
) -> Optional[Mapping[str, Mapping[str, Optional[List[List[str]]]]]]:
"""
Reads entries from the given streams owned by a consumer group.

See https://valkey.io/commands/xreadgroup for more details.

Note:
When in cluster mode, all keys in `keys_and_ids` must map to the same hash slot.

Args:
keys_and_ids (Mapping[str, str]): A mapping of stream keys to stream entry IDs to read from. The special ">"
ID returns messages that were never delivered to any other consumer. Any other valid ID will return
entries pending for the consumer with IDs greater than the one provided.
group_name (str): The consumer group name.
consumer_name (str): The consumer name. The consumer will be auto-created if it does not already exist.
options (Optional[StreamReadGroupOptions]): Options detailing how to read the stream.

Returns:
Optional[Mapping[str, Mapping[str, Optional[List[List[str]]]]]]: A mapping of stream keys, to a mapping of
stream IDs, to a list of pairings with format `[[field, entry], [field, entry], ...]`.
Returns None if the BLOCK option is given and a timeout occurs, or if there is no stream that can be served.

Examples:
>>> await client.xadd("mystream", [("field1", "value1")], StreamAddOptions(id="1-0"))
>>> await client.xgroup_create("mystream", "mygroup", "0-0")
>>> await client.xreadgroup({"mystream": ">"}, "mygroup", "myconsumer", StreamReadGroupOptions(count=1))
{
"mystream": {
"1-0": [["field1", "value1"]],
}
} # Read one stream entry from "mystream" using "myconsumer" in the consumer group "mygroup".
"""
args = ["GROUP", group_name, consumer_name]
if options is not None:
args.extend(options.to_args())

args.append("STREAMS")
args.extend([key for key in keys_and_ids.keys()])
args.extend([value for value in keys_and_ids.values()])

return cast(
Optional[Mapping[str, Mapping[str, Optional[List[List[str]]]]]],
await self._execute_command(RequestType.XReadGroup, args),
)

async def geoadd(
self,
key: str,
Expand Down
34 changes: 34 additions & 0 deletions python/python/glide/async_commands/stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -302,3 +302,37 @@ def to_args(self) -> List[str]:
args.extend([self.ENTRIES_READ_REDIS_API, self.entries_read_id])

return args


class StreamReadGroupOptions(StreamReadOptions):
READ_NOACK_REDIS_API = "NOACK"

def __init__(
self, no_ack=False, block_ms: Optional[int] = None, count: Optional[int] = None
):
"""
Options for reading entries from streams using a consumer group. Can be used as an optional argument to
`XREADGROUP`.

Args:
no_ack (bool): If set, messages are not added to the Pending Entries List (PEL). This is equivalent to
acknowledging the message when it is read. Equivalent to `NOACK` in the Redis API.
block_ms (Optional[int]): If provided, the request will be blocked for the set amount of milliseconds or
until the server has the required number of entries. Equivalent to `BLOCK` in the Redis API.
count (Optional[int]): The maximum number of elements requested. Equivalent to `COUNT` in the Redis API.
"""
super().__init__(block_ms=block_ms, count=count)
self.no_ack = no_ack

def to_args(self) -> List[str]:
"""
Returns the options as a list of string arguments to be used in the `XREADGROUP` command.

Returns:
List[str]: The options as a list of arguments for the `XREADGROUP` command.
"""
args = super().to_args()
if self.no_ack:
args.append(self.READ_NOACK_REDIS_API)

return args
36 changes: 36 additions & 0 deletions python/python/glide/async_commands/transaction.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
StreamAddOptions,
StreamGroupOptions,
StreamRangeBound,
StreamReadGroupOptions,
StreamReadOptions,
StreamTrimOptions,
)
Expand Down Expand Up @@ -2039,6 +2040,41 @@ def xgroup_del_consumer(
RequestType.XGroupDelConsumer, [key, group_name, consumer_name]
)

def xreadgroup(
self: TTransaction,
keys_and_ids: Mapping[str, str],
group_name: str,
consumer_name: str,
options: Optional[StreamReadGroupOptions] = None,
) -> TTransaction:
"""
Reads entries from the given streams owned by a consumer group.

See https://valkey.io/commands/xreadgroup for more details.

Args:
keys_and_ids (Mapping[str, str]): A mapping of stream keys to stream entry IDs to read from. The special ">"
ID returns messages that were never delivered to any other consumer. Any other valid ID will return
entries pending for the consumer with IDs greater than the one provided.
group_name (str): The consumer group name.
consumer_name (str): The consumer name. The consumer will be auto-created if it does not already exist.
options (Optional[StreamReadGroupOptions]): Options detailing how to read the stream.

Command response:
Optional[Mapping[str, Mapping[str, Optional[List[List[str]]]]]]: A mapping of stream keys, to a mapping of
stream IDs, to a list of pairings with format `[[field, entry], [field, entry], ...]`.
Returns None if the BLOCK option is given and a timeout occurs, or if there is no stream that can be served.
"""
args = ["GROUP", group_name, consumer_name]
if options is not None:
args.extend(options.to_args())

args.append("STREAMS")
args.extend([key for key in keys_and_ids.keys()])
args.extend([value for value in keys_and_ids.values()])

return self.append_command(RequestType.XReadGroup, args)

def geoadd(
self: TTransaction,
key: str,
Expand Down
Loading
Loading