Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improved RedisCluster's reinitialize_steps and documentation #1765

Merged
merged 8 commits into from
Dec 2, 2021
36 changes: 29 additions & 7 deletions redis/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -414,14 +414,25 @@ def __init__(
stale data.
When set to true, read commands will be assigned between the
primary and its replications in a Round-Robin manner.
:cluster_error_retry_attempts: 'int'
:cluster_error_retry_attempts: 'int'
Retry command execution attempts when encountering ClusterDownError
or ConnectionError
:retry_on_timeout: 'bool'
:retry_on_timeout: 'bool'
To specify a retry policy, first set `retry_on_timeout` to `True`
then set `retry` to a valid `Retry` object
:retry: 'Retry'
:retry: 'Retry'
a `Retry` object
:reinitialize_steps: 'int'
Specifies the number of MOVED errors that need to occur before
reinitializing the whole cluster topology. If a MOVED error occurs
and the cluster does not need to be reinitialized on this current
error handling, only the MOVED slot will be patched with the
redirected node.
To reinitialize the cluster on every MOVED error, set
reinitialize_steps to 1.
To avoid reinitializing the cluster on moved errors, set
reinitialize_steps to 0.

:**kwargs:
Extra arguments that will be sent into Redis instance when created
(See Official redis-py doc for supported kwargs
Expand Down Expand Up @@ -727,7 +738,9 @@ def _determine_nodes(self, *args, **kwargs):
return [node]

def _should_reinitialized(self):
# In order not to reinitialize the cluster, the user can set
# To reinitialize the cluster on every MOVED error,
# set reinitialize_steps to 1.
# To avoid reinitializing the cluster on moved errors, set
# reinitialize_steps to 0.
if self.reinitialize_steps == 0:
return False
Expand Down Expand Up @@ -958,8 +971,8 @@ def _execute_command(self, target_node, *args, **kwargs):
# redirected node output and try again. If MovedError exceeds
# 'reinitialize_steps' number of times, we will force
# reinitializing the tables, and then try again.
# 'reinitialize_steps' counter will increase faster when the
# same client object is shared between multiple threads. To
# 'reinitialize_steps' counter will increase faster when
# the same client object is shared between multiple threads. To
# reduce the frequency you can set this variable in the
# RedisCluster constructor.
log.exception("MovedError")
Expand Down Expand Up @@ -1055,6 +1068,10 @@ def __repr__(self):
def __eq__(self, obj):
return isinstance(obj, ClusterNode) and obj.name == self.name

def __del__(self):
if self.redis_connection is not None:
self.redis_connection.close()


class LoadBalancer:
"""
Expand Down Expand Up @@ -1300,6 +1317,11 @@ def initialize(self):
startup_node.host, startup_node.port, **copy_kwargs
)
self.startup_nodes[startup_node.name].redis_connection = r
# Make sure cluster mode is enabled on this node
if bool(r.info().get("cluster_enabled")) is False:
raise RedisClusterException(
"Cluster mode is not enabled on this node"
)
cluster_slots = r.execute_command("CLUSTER SLOTS")
startup_nodes_reachable = True
except (ConnectionError, TimeoutError) as e:
Expand Down Expand Up @@ -1327,7 +1349,7 @@ def initialize(self):
message = e.__str__()
raise RedisClusterException(
'ERROR sending "cluster slots" command to redis '
f"server: {startup_node}. error: {message}"
f"server {startup_node.name}. error: {message}"
)

# CLUSTER SLOTS command results in the following output:
Expand Down
20 changes: 19 additions & 1 deletion tests/test_cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ def get_mocked_redis_client(func=None, *args, **kwargs):
"""
cluster_slots = kwargs.pop("cluster_slots", default_cluster_slots)
coverage_res = kwargs.pop("coverage_result", "yes")
cluster_enabled = kwargs.pop("cluster_enabled", True)
with patch.object(Redis, "execute_command") as execute_command_mock:

def execute_command(*_args, **_kwargs):
Expand All @@ -92,7 +93,9 @@ def execute_command(*_args, **_kwargs):
return mock_cluster_slots
elif _args[0] == "COMMAND":
return {"get": [], "set": []}
elif _args[1] == "cluster-require-full-coverage":
elif _args[0] == "INFO":
return {"cluster_enabled": cluster_enabled}
elif len(_args) > 1 and _args[1] == "cluster-require-full-coverage":
return {"cluster-require-full-coverage": coverage_res}
elif func is not None:
return func(*args, **kwargs)
Expand Down Expand Up @@ -1974,6 +1977,17 @@ def test_init_slots_cache(self):

assert len(n_manager.nodes_cache) == 6

def test_init_slots_cache_cluster_mode_disabled(self):
"""
Test that creating a RedisCluster failes if one of the startup nodes
has cluster mode disabled
"""
with pytest.raises(RedisClusterException) as e:
get_mocked_redis_client(
host=default_host, port=default_port, cluster_enabled=False
)
assert "Cluster mode is not enabled on this node" in str(e.value)

def test_empty_startup_nodes(self):
"""
It should not be possible to create a node manager with no nodes
Expand Down Expand Up @@ -2044,6 +2058,8 @@ def create_mocked_redis_node(host, port, **kwargs):
def execute_command(*args, **kwargs):
if args[0] == "CLUSTER SLOTS":
return result
elif args[0] == "INFO":
return {"cluster_enabled": True}
elif args[1] == "cluster-require-full-coverage":
return {"cluster-require-full-coverage": "yes"}
else:
Expand Down Expand Up @@ -2108,6 +2124,8 @@ def execute_command(*args, **kwargs):
["127.0.0.1", 7002, "node_2"],
],
]
elif args[0] == "INFO":
return {"cluster_enabled": True}
elif args[1] == "cluster-require-full-coverage":
return {"cluster-require-full-coverage": "yes"}

Expand Down
4 changes: 2 additions & 2 deletions tests/test_connection_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -450,7 +450,7 @@ class MyConnection(redis.UnixDomainSocketConnection):
pass

pool = redis.ConnectionPool.from_url(
'unix:///socket', connection_class=MyConnection
"unix:///socket", connection_class=MyConnection
)
assert pool.connection_class == MyConnection

Expand All @@ -469,7 +469,7 @@ class MyConnection(redis.SSLConnection):
pass

pool = redis.ConnectionPool.from_url(
'rediss://my.host', connection_class=MyConnection
"rediss://my.host", connection_class=MyConnection
)
assert pool.connection_class == MyConnection

Expand Down