Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Python: add BZPOPMIN and BZPOPMAX commands #1399

Merged
merged 4 commits into from
May 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
* Python: Added ZRANGESTORE command ([#1377](https://github.com/aws/glide-for-redis/pull/1377))
* Python: Added ZDIFFSTORE command ([#1378](https://github.com/aws/glide-for-redis/pull/1378))
* Python: Added ZDIFF command ([#1401](https://github.com/aws/glide-for-redis/pull/1401))
* Python: Added BZPOPMIN and BZPOPMAX commands ([#1399](https://github.com/aws/glide-for-redis/pull/1399))


#### Fixes
* Python: Fix typing error "‘type’ object is not subscriptable" ([#1203](https://github.com/aws/glide-for-redis/pull/1203))
Expand Down
88 changes: 88 additions & 0 deletions glide-core/src/client/value_conversion.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ pub(crate) enum ExpectedReturnType {
ArrayOfArraysOfDoubleOrNull,
ArrayOfKeyValuePairs,
ZMPopReturnType,
KeyWithMemberAndScore,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How come this wasn't included in the Java implementation?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ig it was forgotten, good catch:)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah its because the conversion is needed for RESP2 responses. The Java wrapper doesn't support RESP2 yet but Python does

}

pub(crate) fn convert_to_expected_type(
Expand Down Expand Up @@ -320,6 +321,28 @@ pub(crate) fn convert_to_expected_type(
)
.into()),
},
// Used by BZPOPMIN/BZPOPMAX, which return an array consisting of the key of the sorted set that was popped, the popped member, and its score.
// RESP2 returns the score as a string, but RESP3 returns the score as a double. Here we convert string scores into type double.
ExpectedReturnType::KeyWithMemberAndScore => match value {
Value::Nil => Ok(value),
Value::Array(ref array) if array.len() == 3 && matches!(array[2], Value::Double(_)) => {
Ok(value)
}
Value::Array(mut array)
if array.len() == 3
&& matches!(array[2], Value::BulkString(_) | Value::SimpleString(_)) =>
{
array[2] =
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this only convert the third argument to double? Can there be more arguments with more keys where the timeout is greater than the third argument?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

redis> ZADD zset1 0 a 1 b 2 c
(integer) 3
redis> BZPOPMIN zset1 zset2 0
1) "zset1"
2) "a"
3) "0"

replay can be one of the following:

Nil reply: when no element could be popped and the timeout expired.
Array reply: the keyname, popped member, and its score.

so when its an array replay, which means we popped an element, we want to convert just the score from string to double

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Gotcha, thanks!

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

np

convert_to_expected_type(array[2].clone(), Some(ExpectedReturnType::Double))?;
Ok(Value::Array(array))
}
_ => Err((
ErrorKind::TypeError,
"Response couldn't be converted to an array containing a key, member, and score",
format!("(response was {:?})", value),
)
.into()),
},
}
}

Expand Down Expand Up @@ -454,6 +477,7 @@ pub(crate) fn expected_type_for_cmd(cmd: &Cmd) -> Option<ExpectedReturnType> {
b"ZRANK" | b"ZREVRANK" => cmd
.position(b"WITHSCORE")
.map(|_| ExpectedReturnType::ZRankReturnType),
b"BZPOPMIN" | b"BZPOPMAX" => Some(ExpectedReturnType::KeyWithMemberAndScore),
b"SPOP" => {
if cmd.arg_idx(2).is_some() {
Some(ExpectedReturnType::Set)
Expand Down Expand Up @@ -815,6 +839,70 @@ mod tests {
));
}

#[test]
fn convert_bzpopmin_bzpopmax() {
assert!(matches!(
expected_type_for_cmd(
redis::cmd("BZPOPMIN")
.arg("myzset1")
.arg("myzset2")
.arg("1")
),
Some(ExpectedReturnType::KeyWithMemberAndScore)
));

assert!(matches!(
expected_type_for_cmd(
redis::cmd("BZPOPMAX")
.arg("myzset1")
.arg("myzset2")
.arg("1")
),
Some(ExpectedReturnType::KeyWithMemberAndScore)
));

let array_with_double_score = Value::Array(vec![
Value::BulkString(b"key1".to_vec()),
Value::BulkString(b"member1".to_vec()),
Value::Double(2.0),
]);
let result = convert_to_expected_type(
array_with_double_score.clone(),
Some(ExpectedReturnType::KeyWithMemberAndScore),
)
.unwrap();
assert_eq!(array_with_double_score, result);

let array_with_string_score = Value::Array(vec![
Value::BulkString(b"key1".to_vec()),
Value::BulkString(b"member1".to_vec()),
Value::BulkString(b"2.0".to_vec()),
]);
let result = convert_to_expected_type(
array_with_string_score.clone(),
Some(ExpectedReturnType::KeyWithMemberAndScore),
)
.unwrap();
assert_eq!(array_with_double_score, result);

let converted_nil_value =
convert_to_expected_type(Value::Nil, Some(ExpectedReturnType::KeyWithMemberAndScore))
.unwrap();
assert_eq!(Value::Nil, converted_nil_value);

let array_with_unexpected_length = Value::Array(vec![
Value::BulkString(b"key1".to_vec()),
Value::BulkString(b"member1".to_vec()),
Value::Double(2.0),
Value::Double(2.0),
]);
assert!(convert_to_expected_type(
array_with_unexpected_length,
Some(ExpectedReturnType::KeyWithMemberAndScore)
)
.is_err());
}

#[test]
fn convert_zank_zrevrank_only_if_withsocres_is_included() {
assert!(matches!(
Expand Down
72 changes: 72 additions & 0 deletions python/python/glide/async_commands/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -2285,6 +2285,42 @@ async def zpopmax(
),
)

async def bzpopmax(
self, keys: List[str], timeout: float
) -> Optional[List[Union[str, float]]]:
"""
Pops the member with the highest score from the first non-empty sorted set, with the given keys being checked in
the order that they are given. Blocks the connection when there are no members to remove from any of the given
sorted sets.

When in cluster mode, all keys must map to the same hash slot.

`BZPOPMAX` is the blocking variant of `ZPOPMAX`.

`BZPOPMAX` is a client blocking command, see https://github.com/aws/glide-for-redis/wiki/General-Concepts#blocking-commands for more details and best practices.

See https://valkey.io/commands/bzpopmax for more details.

Args:
keys (List[str]): The keys of the sorted sets.
timeout (float): The number of seconds to wait for a blocking operation to complete.
A value of 0 will block indefinitely.

Returns:
Optional[List[Union[str, float]]]: An array containing the key where the member was popped out, the member itself,
and the member score. If no member could be popped and the `timeout` expired, returns None.

Examples:
>>> await client.zadd("my_sorted_set1", {"member1": 10.0, "member2": 5.0})
2 # Two elements have been added to the sorted set at "my_sorted_set1".
>>> await client.bzpopmax(["my_sorted_set1", "my_sorted_set2"], 0.5)
['my_sorted_set1', 'member1', 10.0] # "member1" with a score of 10.0 has been removed from "my_sorted_set1".
"""
return cast(
Optional[List[Union[str, float]]],
await self._execute_command(RequestType.BZPopMax, keys + [str(timeout)]),
)

async def zpopmin(
self, key: str, count: Optional[int] = None
) -> Mapping[str, float]:
Expand Down Expand Up @@ -2317,6 +2353,42 @@ async def zpopmin(
),
)

async def bzpopmin(
self, keys: List[str], timeout: float
) -> Optional[List[Union[str, float]]]:
"""
Pops the member with the lowest score from the first non-empty sorted set, with the given keys being checked in
the order that they are given. Blocks the connection when there are no members to remove from any of the given
sorted sets.

When in cluster mode, all keys must map to the same hash slot.

`BZPOPMIN` is the blocking variant of `ZPOPMIN`.

`BZPOPMIN` is a client blocking command, see https://github.com/aws/glide-for-redis/wiki/General-Concepts#blocking-commands for more details and best practices.

See https://valkey.io/commands/bzpopmin for more details.

Args:
keys (List[str]): The keys of the sorted sets.
timeout (float): The number of seconds to wait for a blocking operation to complete.
A value of 0 will block indefinitely.

Returns:
Optional[List[Union[str, float]]]: An array containing the key where the member was popped out, the member itself,
and the member score. If no member could be popped and the `timeout` expired, returns None.

Examples:
>>> await client.zadd("my_sorted_set1", {"member1": 10.0, "member2": 5.0})
2 # Two elements have been added to the sorted set at "my_sorted_set1".
>>> await client.bzpopmin(["my_sorted_set1", "my_sorted_set2"], 0.5)
['my_sorted_set1', 'member2', 5.0] # "member2" with a score of 5.0 has been removed from "my_sorted_set1".
"""
return cast(
Optional[List[Union[str, float]]],
await self._execute_command(RequestType.BZPopMin, keys + [str(timeout)]),
)

async def zrange(
self,
key: str,
Expand Down
46 changes: 46 additions & 0 deletions python/python/glide/async_commands/transaction.py
Original file line number Diff line number Diff line change
Expand Up @@ -1624,6 +1624,29 @@ def zpopmax(
RequestType.ZPopMax, [key, str(count)] if count else [key]
)

def bzpopmax(self: TTransaction, keys: List[str], timeout: float) -> TTransaction:
aaron-congo marked this conversation as resolved.
Show resolved Hide resolved
"""
Pops the member with the highest score from the first non-empty sorted set, with the given keys being checked in
the order that they are given. Blocks the connection when there are no members to remove from any of the given
sorted sets.

`BZPOPMAX` is the blocking variant of `ZPOPMAX`.

`BZPOPMAX` is a client blocking command, see https://github.com/aws/glide-for-redis/wiki/General-Concepts#blocking-commands for more details and best practices.

See https://valkey.io/commands/bzpopmax for more details.

Args:
keys (List[str]): The keys of the sorted sets.
timeout (float): The number of seconds to wait for a blocking operation to complete.
A value of 0 will block indefinitely.

Command response:
Optional[List[Union[str, float]]]: An array containing the key where the member was popped out, the member itself,
and the member score. If no member could be popped and the `timeout` expired, returns None.
"""
return self.append_command(RequestType.BZPopMax, keys + [str(timeout)])

def zpopmin(
self: TTransaction, key: str, count: Optional[int] = None
) -> TTransaction:
Expand All @@ -1647,6 +1670,29 @@ def zpopmin(
RequestType.ZPopMin, [key, str(count)] if count else [key]
)

def bzpopmin(self: TTransaction, keys: List[str], timeout: float) -> TTransaction:
aaron-congo marked this conversation as resolved.
Show resolved Hide resolved
"""
Pops the member with the lowest score from the first non-empty sorted set, with the given keys being checked in
the order that they are given. Blocks the connection when there are no members to remove from any of the given
sorted sets.

`BZPOPMIN` is the blocking variant of `ZPOPMIN`.

`BZPOPMIN` is a client blocking command, see https://github.com/aws/glide-for-redis/wiki/General-Concepts#blocking-commands for more details and best practices.

See https://valkey.io/commands/bzpopmin for more details.

Args:
keys (List[str]): The keys of the sorted sets.
timeout (float): The number of seconds to wait for a blocking operation to complete.
A value of 0 will block indefinitely.

Command response:
Optional[List[Union[str, float]]]: An array containing the key where the member was popped out, the member itself,
and the member score. If no member could be popped and the `timeout` expired, returns None.
"""
return self.append_command(RequestType.BZPopMin, keys + [str(timeout)])

def zrange(
self: TTransaction,
key: str,
Expand Down
80 changes: 80 additions & 0 deletions python/python/tests/test_async_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -1833,6 +1833,46 @@ async def test_zpopmin(self, redis_client: TRedisClient):

assert await redis_client.zpopmin("non_exisitng_key") == {}

@pytest.mark.parametrize("cluster_mode", [True, False])
@pytest.mark.parametrize("protocol", [ProtocolVersion.RESP2, ProtocolVersion.RESP3])
async def test_bzpopmin(self, redis_client: TRedisClient):
key1 = f"{{testKey}}:{get_random_string(10)}"
key2 = f"{{testKey}}:{get_random_string(10)}"
non_existing_key = f"{{testKey}}:non_existing_key"

assert await redis_client.zadd(key1, {"a": 1.0, "b": 1.5}) == 2
assert await redis_client.zadd(key2, {"c": 2.0}) == 1
assert await redis_client.bzpopmin([key1, key2], 0.5) == [key1, "a", 1.0]
assert await redis_client.bzpopmin([non_existing_key, key2], 0.5) == [
key2,
"c",
2.0,
]
assert await redis_client.bzpopmin(["non_existing_key"], 0.5) is None

# invalid argument - key list must not be empty
with pytest.raises(RequestError):
await redis_client.bzpopmin([], 0.5)

# key exists, but it is not a sorted set
assert await redis_client.set("foo", "value") == OK
with pytest.raises(RequestError):
await redis_client.bzpopmin(["foo"], 0.5)

aaron-congo marked this conversation as resolved.
Show resolved Hide resolved
# same-slot requirement
if isinstance(redis_client, RedisClusterClient):
with pytest.raises(RequestError) as e:
await redis_client.bzpopmin(["abc", "zxy", "lkn"], 0.5)
assert "CrossSlot" in str(e)

async def endless_bzpopmin_call():
await redis_client.bzpopmin(["non_existent_key"], 0)

# bzpopmax is called against a non-existing key with no timeout, but we wrap the call in an asyncio timeout to
# avoid having the test block forever
with pytest.raises(asyncio.TimeoutError):
await asyncio.wait_for(endless_bzpopmin_call(), timeout=0.5)

@pytest.mark.parametrize("cluster_mode", [True, False])
@pytest.mark.parametrize("protocol", [ProtocolVersion.RESP2, ProtocolVersion.RESP3])
async def test_zpopmax(self, redis_client: TRedisClient):
Expand All @@ -1852,6 +1892,46 @@ async def test_zpopmax(self, redis_client: TRedisClient):

assert await redis_client.zpopmax("non_exisitng_key") == {}

@pytest.mark.parametrize("cluster_mode", [True, False])
@pytest.mark.parametrize("protocol", [ProtocolVersion.RESP2, ProtocolVersion.RESP3])
async def test_bzpopmax(self, redis_client: TRedisClient):
key1 = f"{{testKey}}:{get_random_string(10)}"
key2 = f"{{testKey}}:{get_random_string(10)}"
non_existing_key = f"{{testKey}}:non_existing_key"

assert await redis_client.zadd(key1, {"a": 1.0, "b": 1.5}) == 2
assert await redis_client.zadd(key2, {"c": 2.0}) == 1
assert await redis_client.bzpopmax([key1, key2], 0.5) == [key1, "b", 1.5]
assert await redis_client.bzpopmax([non_existing_key, key2], 0.5) == [
key2,
"c",
2.0,
]
assert await redis_client.bzpopmax(["non_existing_key"], 0.5) is None

# invalid argument - key list must not be empty
with pytest.raises(RequestError):
await redis_client.bzpopmax([], 0.5)

# key exists, but it is not a sorted set
assert await redis_client.set("foo", "value") == OK
with pytest.raises(RequestError):
await redis_client.bzpopmax(["foo"], 0.5)
aaron-congo marked this conversation as resolved.
Show resolved Hide resolved

# same-slot requirement
if isinstance(redis_client, RedisClusterClient):
with pytest.raises(RequestError) as e:
await redis_client.bzpopmax(["abc", "zxy", "lkn"], 0.5)
assert "CrossSlot" in str(e)

async def endless_bzpopmax_call():
await redis_client.bzpopmax(["non_existent_key"], 0)

# bzpopmax is called against a non-existing key with no timeout, but we wrap the call in an asyncio timeout to
# avoid having the test block forever
with pytest.raises(asyncio.TimeoutError):
await asyncio.wait_for(endless_bzpopmax_call(), timeout=0.5)

@pytest.mark.parametrize("cluster_mode", [True, False])
@pytest.mark.parametrize("protocol", [ProtocolVersion.RESP2, ProtocolVersion.RESP3])
async def test_zrange_by_index(self, redis_client: TRedisClient):
Expand Down
12 changes: 8 additions & 4 deletions python/python/tests/test_transaction.py
Original file line number Diff line number Diff line change
Expand Up @@ -228,12 +228,16 @@ async def transaction_test(
args.append([2.0, 3.0])
transaction.zrangestore(key8, key8, RangeByIndex(0, -1))
args.append(3)
transaction.zpopmin(key8)
args.append({"two": 2.0})
transaction.bzpopmin([key8], 0.5)
args.append([key8, "two", 2.0])
transaction.bzpopmax([key8], 0.5)
args.append([key8, "four", 4.0])
transaction.zpopmax(key8)
args.append({"four": 4})
args.append({"three": 3.0})
transaction.zpopmin(key8)
args.append({})
transaction.zremrangebyscore(key8, InfBound.NEG_INF, InfBound.POS_INF)
args.append(1)
args.append(0)
transaction.zremrangebylex(key8, InfBound.NEG_INF, InfBound.POS_INF)
args.append(0)
transaction.zdiffstore(key8, [key8, key8])
Expand Down
Loading