Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Python: add FCALL_RO command #1721

Merged
merged 10 commits into from
Jul 1, 2024
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@
* Python: Added FUNCTION DELETE command ([#1714](https://github.com/aws/glide-for-redis/pull/1714))
* Python: Added SSCAN command ([#1709](https://github.com/aws/glide-for-redis/pull/1709))
* Python: Added LCS command ([#1716](https://github.com/aws/glide-for-redis/pull/1716))
* Python: Added FCALL_RO command ([#1721](https://github.com/aws/glide-for-redis/pull/1721))

### Breaking Changes
* Node: Update XREAD to return a Map of Map ([#1494](https://github.com/aws/glide-for-redis/pull/1494))
Expand Down
43 changes: 43 additions & 0 deletions python/python/glide/async_commands/cluster_commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -415,6 +415,49 @@ async def function_delete(
),
)

async def fcall_ro(
self,
function: str,
keys: Optional[List[str]] = None,
arguments: Optional[List[str]] = None,
route: Optional[Route] = None,
yipin-chen marked this conversation as resolved.
Show resolved Hide resolved
) -> TClusterResponse[TResult]:
yipin-chen marked this conversation as resolved.
Show resolved Hide resolved
"""
Invokes a previously loaded read-only function.

See https://valkey.io/commands/fcall_ro for more details.

Args:
function (str): The function name.
keys (List[str]): An `array` of keys accessed by the function. To ensure the correct
execution of functions, all names of keys that a function accesses must be
explicitly provided as `keys`.
arguments (List[str]): An `array` of `function` arguments. `arguments` should not
represent names of keys.
route (Optional[Route]): Specifies the routing configuration of the command. The client
will route the command to the nodes defined by `route`.

Returns:
TResult: The return value depends on the function that was executed.
yipin-chen marked this conversation as resolved.
Show resolved Hide resolved

Examples:
>>> await client.FCallReadOnly("Deep_Thought", ALL_NODES)
42 # The return value on the function that was executed

Since: Redis version 7.0.0.
"""
args = []
if keys is not None:
args.extend([function, str(len(keys))] + keys)
else:
args.extend([function, str(0)])
if arguments is not None:
yipin-chen marked this conversation as resolved.
Show resolved Hide resolved
args.extend(arguments)
return cast(
TResult,
yipin-chen marked this conversation as resolved.
Show resolved Hide resolved
await self._execute_command(RequestType.FCallReadOnly, args, route),
)

async def time(self, route: Optional[Route] = None) -> TClusterResponse[List[str]]:
"""
Returns the server time.
Expand Down
41 changes: 41 additions & 0 deletions python/python/glide/async_commands/standalone_commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -314,6 +314,47 @@ async def function_delete(self, library_name: str) -> TOK:
),
)

async def fcall_ro(
self,
function: str,
keys: Optional[List[str]] = None,
arguments: Optional[List[str]] = None,
) -> TResult:
"""
Invokes a previously loaded read-only function.

See https://valkey.io/commands/fcall_ro for more details.

Args:
function (str): The function name.
keys (List[str]): An `array` of keys accessed by the function. To ensure the correct
execution of functions, all names of keys that a function accesses must be
explicitly provided as `keys`.
arguments (List[str]): An `array` of `function` arguments. `arguments` should not
represent names of keys.

Returns:
TResult: The return value depends on the function that was executed.

Examples:
>>> await client.FCallReadOnly("Deep_Thought", ["key1"], ["Answer", "to", "the",
"Ultimate", "Question", "of", "Life,", "the", "Universe,", "and", "Everything"])
42 # The return value on the function that was executed

Since: Redis version 7.0.0.
"""
args = []
if keys is not None:
args.extend([function, str(len(keys))] + keys)
else:
args.extend([function, str(0)])
if arguments is not None:
args.extend(arguments)
return cast(
TResult,
await self._execute_command(RequestType.FCallReadOnly, args),
)

async def time(self) -> List[str]:
"""
Returns the server time.
Expand Down
33 changes: 33 additions & 0 deletions python/python/glide/async_commands/transaction.py
Original file line number Diff line number Diff line change
Expand Up @@ -1841,6 +1841,39 @@ def function_delete(self: TTransaction, library_name: str) -> TTransaction:
[library_name],
)

def fcall_ro(
self: TTransaction,
function: str,
keys: Optional[List[str]] = None,
arguments: Optional[List[str]] = None,
) -> TTransaction:
"""
Invokes a previously loaded read-only function.

See https://valkey.io/commands/fcall_ro for more details.

Args:
function (str): The function name.
keys (List[str]): An `array` of keys accessed by the function. To ensure the correct
execution of functions, all names of keys that a function accesses must be
explicitly provided as `keys`.
arguments (List[str]): An `array` of `function` arguments. `arguments` should not
represent names of keys.

Command Response::
yipin-chen marked this conversation as resolved.
Show resolved Hide resolved
TResult: The return value depends on the function that was executed.

Since: Redis version 7.0.0.
"""
args = []
if keys is not None:
args.extend([function, str(len(keys))] + keys)
else:
args.extend([function, str(0)])
if arguments is not None:
args.extend(arguments)
return self.append_command(RequestType.FCallReadOnly, args)

def xadd(
self: TTransaction,
key: str,
Expand Down
131 changes: 117 additions & 14 deletions python/python/tests/test_async_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,11 @@
TrimByMaxLen,
TrimByMinId,
)
from glide.async_commands.transaction import (
BaseTransaction,
ClusterTransaction,
Transaction,
)
from glide.config import (
ClusterClientConfiguration,
GlideClientConfiguration,
Expand Down Expand Up @@ -6650,17 +6655,12 @@ async def test_function_load(self, redis_client: TGlideClient):

assert await redis_client.function_load(code) == lib_name

# TODO: change when FCALL, FCALL_RO is implemented
# TODO: change when FCALL is implemented
assert (
await redis_client.custom_command(["FCALL", func_name, "0", "one", "two"])
== "one"
)
assert (
await redis_client.custom_command(
["FCALL_RO", func_name, "0", "one", "two"]
)
== "one"
)
assert await redis_client.fcall_ro(func_name, arguments=["one", "two"]) == "one"

# TODO: add FUNCTION LIST once implemented

Expand All @@ -6680,6 +6680,11 @@ async def test_function_load(self, redis_client: TGlideClient):

assert await redis_client.function_load(new_code, True) == lib_name

# TODO: add when FCALL is implemented
assert await redis_client.fcall_ro(func2_name, arguments=["one", "two"]) == 2

assert await redis_client.function_flush(FlushMode.SYNC) is OK

@pytest.mark.parametrize("cluster_mode", [True])
@pytest.mark.parametrize("protocol", [ProtocolVersion.RESP2, ProtocolVersion.RESP3])
@pytest.mark.parametrize("single_route", [True, False])
Expand All @@ -6699,22 +6704,25 @@ async def test_function_load_cluster_with_route(

assert await redis_client.function_load(code, False, route) == lib_name

# TODO: change when FCALL, FCALL_RO is implemented.
# TODO: change when FCALL is implemented.
assert (
await redis_client.custom_command(
["FCALL", func_name, "0", "one", "two"],
SlotKeyRoute(SlotType.PRIMARY, "1"),
)
== "one"
)
assert (
await redis_client.custom_command(
["FCALL_RO", func_name, "0", "one", "two"],
SlotKeyRoute(SlotType.PRIMARY, "1"),
)
== "one"
result = await redis_client.fcall_ro(
func_name, arguments=["one", "two"], route=route
)

if single_route:
assert result == "one"
else:
assert isinstance(result, dict)
for nodeResponse in result.values():
assert nodeResponse == "one"

# TODO: add FUNCTION LIST once implemented

# re-load library without replace
Expand All @@ -6733,6 +6741,20 @@ async def test_function_load_cluster_with_route(

assert await redis_client.function_load(new_code, True, route) == lib_name

# TODO: add when FCALL is implemented.
result = await redis_client.fcall_ro(
func2_name, arguments=["one", "two"], route=route
)

if single_route:
assert result == 2
else:
assert isinstance(result, dict)
for nodeResponse in result.values():
assert nodeResponse == 2

assert await redis_client.function_flush(FlushMode.SYNC, route) is OK

@pytest.mark.parametrize("cluster_mode", [True, False])
@pytest.mark.parametrize("protocol", [ProtocolVersion.RESP2, ProtocolVersion.RESP3])
async def test_function_flush(self, redis_client: TGlideClient):
Expand Down Expand Up @@ -6849,6 +6871,86 @@ async def test_function_delete_with_routing(
await redis_client.function_delete(lib_name)
assert "Library not found" in str(e)

@pytest.mark.parametrize("cluster_mode", [True])
@pytest.mark.parametrize("protocol", [ProtocolVersion.RESP2, ProtocolVersion.RESP3])
async def test_fcall_with_key(self, redis_client: GlideClusterClient):
min_version = "7.0.0"
if await check_if_server_version_lt(redis_client, min_version):
return pytest.mark.skip(reason=f"Redis version required >= {min_version}")

key1 = f"{{testKey}}:1-{get_random_string(10)}"
key2 = f"{{testKey}}:2-{get_random_string(10)}"
keys = [key1, key2]
route = SlotKeyRoute(SlotType.PRIMARY, key1)
lib_name = f"mylib1C{get_random_string(5)}"
func_name = f"myfunc1c{get_random_string(5)}"
code = generate_lua_lib_code(lib_name, {func_name: "return keys[1]"}, True)

assert await redis_client.function_flush(FlushMode.SYNC, route) is OK
assert await redis_client.function_load(code, False, route) == lib_name

# TODO: add when FCALL is implemented.
assert await redis_client.fcall_ro(func_name, keys=keys, arguments=[]) == key1

transaction = ClusterTransaction()
# TODO: add when FCALL is implemented.
transaction.fcall_ro(func_name, keys=keys, arguments=[])

# check response from a routed transaction request
result = await redis_client.exec(transaction, route)
assert result is not None
assert result[0] == key1

# if no route given, GLIDE should detect it automatically
result = await redis_client.exec(transaction)
assert result is not None
assert result[0] == key1

assert await redis_client.function_flush(FlushMode.SYNC, route) is OK

@pytest.mark.parametrize("cluster_mode", [True])
@pytest.mark.parametrize("protocol", [ProtocolVersion.RESP2, ProtocolVersion.RESP3])
async def test_fcall_readonly_function(self, redis_client: GlideClusterClient):
min_version = "7.0.0"
if await check_if_server_version_lt(redis_client, min_version):
return pytest.mark.skip(reason=f"Redis version required >= {min_version}")

lib_name = f"fcall_readonly_function{get_random_string(5)}"
# intentionally using a REPLICA route
replicaRoute = SlotKeyRoute(SlotType.REPLICA, lib_name)
primaryRoute = SlotKeyRoute(SlotType.PRIMARY, lib_name)
func_name = f"fcall_readonly_function{get_random_string(5)}"

# function $funcName returns a magic number
code = generate_lua_lib_code(lib_name, {func_name: "return 42"}, False)

assert await redis_client.function_load(code, False) == lib_name

# On a replica node should fail, because a function isn't guaranteed to be RO
# TODO: add when FCALL is implemented.
with pytest.raises(RequestError):
assert await redis_client.fcall_ro(
yipin-chen marked this conversation as resolved.
Show resolved Hide resolved
func_name, keys=[], arguments=[], route=replicaRoute
)

# fcall_ro also fails to run it even on primary - another error
with pytest.raises(RequestError):
assert await redis_client.fcall_ro(
yipin-chen marked this conversation as resolved.
Show resolved Hide resolved
func_name, keys=[], arguments=[], route=primaryRoute
)

# create the same function, but with RO flag
code = generate_lua_lib_code(lib_name, {func_name: "return 42"}, True)
assert await redis_client.function_load(code, True) == lib_name

# fcall should succeed now
assert (
await redis_client.fcall_ro(
func_name, keys=[], arguments=[], route=replicaRoute
)
== 42
)

@pytest.mark.parametrize("cluster_mode", [True, False])
@pytest.mark.parametrize("protocol", [ProtocolVersion.RESP2, ProtocolVersion.RESP3])
async def test_srandmember(self, redis_client: TGlideClient):
Expand Down Expand Up @@ -7369,6 +7471,7 @@ async def test_multi_key_command_returns_cross_slot_error(
redis_client.lcs("abc", "def"),
redis_client.lcs_len("abc", "def"),
redis_client.lcs_idx("abc", "def"),
redis_client.fcall_ro("func", ["abc", "zxy", "lkn"], []),
]
)

Expand Down
6 changes: 6 additions & 0 deletions python/python/tests/test_transaction.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,12 @@ async def transaction_test(
args.append(OK)
transaction.function_flush(FlushMode.SYNC)
args.append(OK)
transaction.function_load(code, True)
args.append(lib_name)
transaction.fcall_ro(func_name, [], arguments=["one", "two"])
args.append("one")
transaction.fcall_ro(func_name, [key], arguments=["one", "two"])
args.append("one")

transaction.dbsize()
args.append(0)
Expand Down
Loading