Skip to content

Commit

Permalink
async_cluster: optimisations (redis#2205)
Browse files Browse the repository at this point in the history
- return true from execute_pipeline if there are any errors
- use todo list to speedup retries
- store initialisation node in CommandsParser object
- add sync context manager for pipeline
- use if/else instead of try/except
- make command a function argument in _determine_nodes & _determine_slot

- add async cluster pipeline benchmark script
  • Loading branch information
utkarshgupta137 authored Jun 1, 2022
1 parent 7880460 commit 05fc203
Show file tree
Hide file tree
Showing 4 changed files with 206 additions and 93 deletions.
4 changes: 2 additions & 2 deletions benchmarks/cluster_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -249,8 +249,8 @@ async def main(loop, gather=None):
port = 16379
password = None

count = 1000
size = 16
count = 10000
size = 256

asyncio.run(main("asyncio"))
asyncio.run(main("asyncio", gather=False))
Expand Down
107 changes: 107 additions & 0 deletions benchmarks/cluster_async_pipeline.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
import asyncio
import functools
import time

import aioredis_cluster
import aredis
import uvloop

import redis.asyncio as redispy


def timer(func):
@functools.wraps(func)
async def wrapper(*args, **kwargs):
tic = time.perf_counter()
await func(*args, **kwargs)
toc = time.perf_counter()
return f"{toc - tic:.4f}"

return wrapper


@timer
async def warmup(client):
await asyncio.gather(
*(asyncio.create_task(client.exists(f"bench:warmup_{i}")) for i in range(100))
)


@timer
async def run(client):
data_str = "a" * size
data_int = int("1" * size)

for i in range(count):
with client.pipeline() as pipe:
await (
pipe.set(f"bench:str_{i}", data_str)
.set(f"bench:int_{i}", data_int)
.get(f"bench:str_{i}")
.get(f"bench:int_{i}")
.hset("bench:hset", str(i), data_str)
.hget("bench:hset", str(i))
.incr("bench:incr")
.lpush("bench:lpush", data_int)
.lrange("bench:lpush", 0, 300)
.lpop("bench:lpush")
.execute()
)


async def main(loop):
arc = aredis.StrictRedisCluster(
host=host,
port=port,
password=password,
max_connections=2**31,
max_connections_per_node=2**31,
readonly=False,
reinitialize_steps=count,
skip_full_coverage_check=True,
decode_responses=False,
max_idle_time=count,
idle_check_interval=count,
)
print(f"{loop} {await warmup(arc)} aredis")
print(await run(arc))
arc.connection_pool.disconnect()

aiorc = await aioredis_cluster.create_redis_cluster(
[(host, port)],
password=password,
state_reload_interval=count,
idle_connection_timeout=count,
pool_maxsize=2**31,
)
print(f"{loop} {await warmup(aiorc)} aioredis-cluster")
print(await run(aiorc))
aiorc.close()
await aiorc.wait_closed()

async with redispy.RedisCluster(
host=host,
port=port,
password=password,
reinitialize_steps=count,
read_from_replicas=False,
decode_responses=False,
max_connections=2**31,
) as rca:
print(f"{loop} {await warmup(rca)} redispy")
print(await run(rca))


if __name__ == "__main__":
host = "localhost"
port = 16379
password = None

count = 10000
size = 256

asyncio.run(main("asyncio"))

uvloop.install()

asyncio.run(main("uvloop"))
165 changes: 86 additions & 79 deletions redis/asyncio/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -468,9 +468,8 @@ def keyslot(self, key: EncodableT) -> int:
return key_slot(k)

async def _determine_nodes(
self, *args: Any, node_flag: Optional[str] = None
self, command: str, *args: Any, node_flag: Optional[str] = None
) -> List["ClusterNode"]:
command = args[0]
if not node_flag:
# get the nodes group for this command if it was predefined
node_flag = self.command_flags.get(command)
Expand All @@ -495,16 +494,15 @@ async def _determine_nodes(
# get the node that holds the key's slot
return [
self.nodes_manager.get_node_from_slot(
await self._determine_slot(*args),
await self._determine_slot(command, *args),
self.read_from_replicas and command in READ_COMMANDS,
)
]

async def _determine_slot(self, *args: Any) -> int:
command = args[0]
async def _determine_slot(self, command: str, *args: Any) -> int:
if self.command_flags.get(command) == SLOT_ID:
# The command contains the slot ID
return int(args[1])
return int(args[0])

# Get the keys in the command

Expand All @@ -516,19 +514,17 @@ async def _determine_slot(self, *args: Any) -> int:
# - fix: https://github.com/redis/redis/pull/9733
if command in ("EVAL", "EVALSHA"):
# command syntax: EVAL "script body" num_keys ...
if len(args) <= 2:
raise RedisClusterException(f"Invalid args in command: {args}")
num_actual_keys = args[2]
eval_keys = args[3 : 3 + num_actual_keys]
if len(args) < 2:
raise RedisClusterException(
f"Invalid args in command: {command, *args}"
)
keys = args[2 : 2 + args[1]]
# if there are 0 keys, that means the script can be run on any node
# so we can just return a random slot
if not eval_keys:
if not keys:
return random.randrange(0, REDIS_CLUSTER_HASH_SLOTS)
keys = eval_keys
else:
keys = await self.commands_parser.get_keys(
self.nodes_manager.default_node, *args
)
keys = await self.commands_parser.get_keys(command, *args)
if not keys:
# FCALL can call a function with 0 keys, that means the function
# can be run on any node so we can just return a random slot
Expand Down Expand Up @@ -848,13 +844,13 @@ def acquire_connection(self) -> Connection:
self._free.append(connection)

return self._free.popleft()
else:
if len(self._connections) < self.max_connections:
connection = self.connection_class(**self.connection_kwargs)
self._connections.append(connection)
return connection
else:
raise ConnectionError("Too many connections")

if len(self._connections) < self.max_connections:
connection = self.connection_class(**self.connection_kwargs)
self._connections.append(connection)
return connection

raise ConnectionError("Too many connections")

async def parse_response(
self, connection: Connection, command: str, **kwargs: Any
Expand All @@ -872,10 +868,10 @@ async def parse_response(
raise

# Return response
try:
if command in self.response_callbacks:
return self.response_callbacks[command](response, **kwargs)
except KeyError:
return response

return response

async def execute_command(self, *args: Any, **kwargs: Any) -> Any:
# Acquire connection
Expand All @@ -891,7 +887,7 @@ async def execute_command(self, *args: Any, **kwargs: Any) -> Any:
# Release connection
self._free.append(connection)

async def execute_pipeline(self) -> None:
async def execute_pipeline(self) -> bool:
# Acquire connection
connection = self.acquire_connection()

Expand All @@ -901,17 +897,20 @@ async def execute_pipeline(self) -> None:
)

# Read responses
try:
for cmd in self._command_stack:
try:
cmd.result = await self.parse_response(
connection, cmd.args[0], **cmd.kwargs
)
except Exception as e:
cmd.result = e
finally:
# Release connection
self._free.append(connection)
ret = False
for cmd in self._command_stack:
try:
cmd.result = await self.parse_response(
connection, cmd.args[0], **cmd.kwargs
)
except Exception as e:
cmd.result = e
ret = True

# Release connection
self._free.append(connection)

return ret


class NodesManager:
Expand Down Expand Up @@ -1257,6 +1256,13 @@ async def __aexit__(self, exc_type: None, exc_value: None, traceback: None) -> N
def __await__(self) -> Generator[Any, None, "ClusterPipeline"]:
return self.initialize().__await__()

def __enter__(self) -> "ClusterPipeline":
self._command_stack = []
return self

def __exit__(self, exc_type: None, exc_value: None, traceback: None) -> None:
self._command_stack = []

def __bool__(self) -> bool:
return bool(self._command_stack)

Expand Down Expand Up @@ -1310,6 +1316,7 @@ async def execute(

try:
return await self._execute(
self._client,
self._command_stack,
raise_on_error=raise_on_error,
allow_redirections=allow_redirections,
Expand All @@ -1331,60 +1338,60 @@ async def execute(

async def _execute(
self,
client: "RedisCluster",
stack: List["PipelineCommand"],
raise_on_error: bool = True,
allow_redirections: bool = True,
) -> List[Any]:
client = self._client
todo = [
cmd for cmd in stack if not cmd.result or isinstance(cmd.result, Exception)
]

nodes = {}
for cmd in stack:
if not cmd.result or isinstance(cmd.result, Exception):
target_nodes = await client._determine_nodes(*cmd.args)
if not target_nodes:
raise RedisClusterException(
f"No targets were found to execute {cmd.args} command on"
)
if len(target_nodes) > 1:
raise RedisClusterException(
f"Too many targets for command {cmd.args}"
)
for cmd in todo:
target_nodes = await client._determine_nodes(*cmd.args)
if not target_nodes:
raise RedisClusterException(
f"No targets were found to execute {cmd.args} command on"
)
if len(target_nodes) > 1:
raise RedisClusterException(f"Too many targets for command {cmd.args}")

node = target_nodes[0]
if node.name not in nodes:
nodes[node.name] = node
node._command_stack = []
node._command_stack.append(cmd)
node = target_nodes[0]
if node.name not in nodes:
nodes[node.name] = node
node._command_stack = []
node._command_stack.append(cmd)

await asyncio.gather(
errors = await asyncio.gather(
*(asyncio.ensure_future(node.execute_pipeline()) for node in nodes.values())
)

if allow_redirections:
# send each errored command individually
for cmd in stack:
if isinstance(cmd.result, (TryAgainError, MovedError, AskError)):
try:
cmd.result = await client.execute_command(
*cmd.args, **cmd.kwargs
if any(errors):
if allow_redirections:
# send each errored command individually
for cmd in todo:
if isinstance(cmd.result, (TryAgainError, MovedError, AskError)):
try:
cmd.result = await client.execute_command(
*cmd.args, **cmd.kwargs
)
except Exception as e:
cmd.result = e

if raise_on_error:
for cmd in todo:
result = cmd.result
if isinstance(result, Exception):
command = " ".join(map(safe_str, cmd.args))
msg = (
f"Command # {cmd.position + 1} ({command}) of pipeline "
f"caused error: {result.args}"
)
except Exception as e:
cmd.result = e

responses = [cmd.result for cmd in stack]

if raise_on_error:
for cmd in stack:
result = cmd.result
if isinstance(result, Exception):
command = " ".join(map(safe_str, cmd.args))
msg = (
f"Command # {cmd.position + 1} ({command}) of pipeline "
f"caused error: {result.args}"
)
result.args = (msg,) + result.args[1:]
raise result
result.args = (msg,) + result.args[1:]
raise result

return responses
return [cmd.result for cmd in stack]

def _split_command_across_slots(
self, command: str, *keys: KeyT
Expand Down
Loading

0 comments on commit 05fc203

Please sign in to comment.