diff --git a/redis/cluster.py b/redis/cluster.py index 3bf1ad3c40..a05b31faa1 100644 --- a/redis/cluster.py +++ b/redis/cluster.py @@ -1091,7 +1091,7 @@ def execute_command(self, *args, **kwargs): # The nodes and slots cache were reinitialized. # Try again with the new cluster setup. retry_attempts -= 1 - if self.retry and isinstance(e, self.retry._supported_errors): + if self.retry and self.retry.is_supported_error(e): backoff = self.retry._backoff.compute( self.cluster_error_retry_attempts - retry_attempts ) @@ -2100,20 +2100,19 @@ def _send_cluster_commands( redis_node = self.get_redis_connection(node) try: connection = get_connection(redis_node, c.args) - except (ConnectionError, TimeoutError) as e: + except BaseException as e: for n in nodes.values(): n.connection_pool.release(n.connection) n.connection = None nodes = {} - if self.retry and isinstance( - e, self.retry._supported_errors - ): + if self.retry and self.retry.is_supported_error(e): backoff = self.retry._backoff.compute(attempts_count) if backoff > 0: time.sleep(backoff) - self.nodes_manager.initialize() - if is_default_node: - self.replace_default_node() + if isinstance(e, (ConnectionError, TimeoutError)): + self.nodes_manager.initialize() + if is_default_node: + self.replace_default_node() raise nodes[node_name] = NodeCommands( redis_node.parse_response, @@ -2229,6 +2228,8 @@ def _send_cluster_commands( if n.connection: n.connection.disconnect() n.connection_pool.release(n.connection) + if len(nodes) > 0: + time.sleep(0.25) raise def _fail_on_redirect(self, allow_redirections): diff --git a/redis/retry.py b/redis/retry.py index 606443053e..59e9d17998 100644 --- a/redis/retry.py +++ b/redis/retry.py @@ -32,6 +32,9 @@ def update_supported_errors(self, specified_errors: list): set(self._supported_errors + tuple(specified_errors)) ) + def is_supported_error(self, error): + return isinstance(error, self._supported_errors) + def call_with_retry(self, do, fail): """ Execute an operation that might fail and returns its result, or diff --git a/tests/test_cluster.py b/tests/test_cluster.py index c330fc947f..9c95540f98 100644 --- a/tests/test_cluster.py +++ b/tests/test_cluster.py @@ -2681,8 +2681,10 @@ def raise_error(): m.side_effect = raise_error - with pytest.raises(Exception, match="unexpected error"): - r.pipeline().get("a").execute() + with patch.object(Connection, "disconnect") as d: + with pytest.raises(Exception, match="unexpected error"): + r.pipeline().get("a").execute() + assert d.call_count == 1 for cluster_node in r.nodes_manager.nodes_cache.values(): connection_pool = cluster_node.redis_connection.connection_pool @@ -2984,7 +2986,7 @@ def raise_ask_error(): assert res == ["MOCK_OK"] @pytest.mark.parametrize("error", [ConnectionError, TimeoutError]) - def test_return_previous_acquired_connections(self, r, error): + def test_return_previous_acquired_connections_with_retry(self, r, error): # in order to ensure that a pipeline will make use of connections # from different nodes assert r.keyslot("a") != r.keyslot("b") @@ -3000,7 +3002,13 @@ def raise_error(target_node, *args, **kwargs): get_connection.side_effect = raise_error - r.pipeline().get("a").get("b").execute() + with patch.object(NodesManager, "initialize") as i: + # in order to remove disconnect caused by initialize + i.side_effect = lambda: None + + with patch.object(Connection, "disconnect") as d: + r.pipeline().get("a").get("b").execute() + assert d.call_count == 0 # there should have been two get_connections per execution and # two executions due to exception raised in the first execution @@ -3010,6 +3018,39 @@ def raise_error(target_node, *args, **kwargs): num_of_conns = len(connection_pool._available_connections) assert num_of_conns == connection_pool._created_connections + @pytest.mark.parametrize("error", [RedisClusterException, BaseException]) + def test_return_previous_acquired_connections_without_retry(self, r, error): + # in order to ensure that a pipeline will make use of connections + # from different nodes + assert r.keyslot("a") != r.keyslot("b") + + orig_func = redis.cluster.get_connection + with patch("redis.cluster.get_connection") as get_connection: + + def raise_error(target_node, *args, **kwargs): + if get_connection.call_count == 2: + raise error("mocked error") + else: + return orig_func(target_node, *args, **kwargs) + + get_connection.side_effect = raise_error + + with patch.object(Connection, "disconnect") as d: + with pytest.raises(error): + r.pipeline().get("a").get("b").execute() + assert d.call_count == 0 + + # there should have been two get_connections per execution and + # two executions due to exception raised in the first execution + assert get_connection.call_count == 2 + for cluster_node in r.nodes_manager.nodes_cache.values(): + connection_pool = cluster_node.redis_connection.connection_pool + num_of_conns = len(connection_pool._available_connections) + assert num_of_conns == connection_pool._created_connections + # connection must remain connected + for conn in connection_pool._available_connections: + assert conn._sock is not None + def test_empty_stack(self, r): """ If pipeline is executed with no commands it should diff --git a/tests/test_retry.py b/tests/test_retry.py index 3cfea5c09e..9e23a29b41 100644 --- a/tests/test_retry.py +++ b/tests/test_retry.py @@ -9,6 +9,8 @@ BusyLoadingError, ConnectionError, ReadOnlyError, + RedisClusterException, + RedisError, TimeoutError, ) from redis.retry import Retry @@ -122,6 +124,16 @@ def test_infinite_retry(self): assert self.actual_attempts == 5 assert self.actual_failures == 5 + @pytest.mark.parametrize("exception_class", [ConnectionError, TimeoutError]) + def test_is_supported_error_true(self, exception_class): + retry = Retry(BackoffMock(), -1) + assert retry.is_supported_error(exception_class()) + + @pytest.mark.parametrize("exception_class", [RedisClusterException, RedisError]) + def test_is_supported_error_false(self, exception_class): + retry = Retry(BackoffMock(), -1) + assert not retry.is_supported_error(exception_class()) + @pytest.mark.onlynoncluster class TestRedisClientRetry: