Skip to content

Commit

Permalink
[GROW-3361] always release, instead of disconnect, when error occurs …
Browse files Browse the repository at this point in the history
…during get_connection (#11)

* add is_supported_error() to retry

* release, instead of disconnect on any error, when fetching connections in cluster pipeline

* add a default backoff after cluster pipeline disconnects its connections

(cherry picked from commit 179891d)
  • Loading branch information
zach-iee committed Aug 24, 2023
1 parent 1f75b91 commit 68ef50a
Show file tree
Hide file tree
Showing 4 changed files with 69 additions and 12 deletions.
17 changes: 9 additions & 8 deletions redis/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -1091,7 +1091,7 @@ def execute_command(self, *args, **kwargs):
# The nodes and slots cache were reinitialized.
# Try again with the new cluster setup.
retry_attempts -= 1
if self.retry and isinstance(e, self.retry._supported_errors):
if self.retry and self.retry.is_supported_error(e):
backoff = self.retry._backoff.compute(
self.cluster_error_retry_attempts - retry_attempts
)
Expand Down Expand Up @@ -2100,20 +2100,19 @@ def _send_cluster_commands(
redis_node = self.get_redis_connection(node)
try:
connection = get_connection(redis_node, c.args)
except (ConnectionError, TimeoutError) as e:
except BaseException as e:
for n in nodes.values():
n.connection_pool.release(n.connection)
n.connection = None
nodes = {}
if self.retry and isinstance(
e, self.retry._supported_errors
):
if self.retry and self.retry.is_supported_error(e):
backoff = self.retry._backoff.compute(attempts_count)
if backoff > 0:
time.sleep(backoff)
self.nodes_manager.initialize()
if is_default_node:
self.replace_default_node()
if isinstance(e, (ConnectionError, TimeoutError)):
self.nodes_manager.initialize()
if is_default_node:
self.replace_default_node()
raise
nodes[node_name] = NodeCommands(
redis_node.parse_response,
Expand Down Expand Up @@ -2229,6 +2228,8 @@ def _send_cluster_commands(
if n.connection:
n.connection.disconnect()
n.connection_pool.release(n.connection)
if len(nodes) > 0:
time.sleep(0.25)
raise

def _fail_on_redirect(self, allow_redirections):
Expand Down
3 changes: 3 additions & 0 deletions redis/retry.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@ def update_supported_errors(self, specified_errors: list):
set(self._supported_errors + tuple(specified_errors))
)

def is_supported_error(self, error):
return isinstance(error, self._supported_errors)

def call_with_retry(self, do, fail):
"""
Execute an operation that might fail and returns its result, or
Expand Down
49 changes: 45 additions & 4 deletions tests/test_cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -2681,8 +2681,10 @@ def raise_error():

m.side_effect = raise_error

with pytest.raises(Exception, match="unexpected error"):
r.pipeline().get("a").execute()
with patch.object(Connection, "disconnect") as d:
with pytest.raises(Exception, match="unexpected error"):
r.pipeline().get("a").execute()
assert d.call_count == 1

for cluster_node in r.nodes_manager.nodes_cache.values():
connection_pool = cluster_node.redis_connection.connection_pool
Expand Down Expand Up @@ -2984,7 +2986,7 @@ def raise_ask_error():
assert res == ["MOCK_OK"]

@pytest.mark.parametrize("error", [ConnectionError, TimeoutError])
def test_return_previous_acquired_connections(self, r, error):
def test_return_previous_acquired_connections_with_retry(self, r, error):
# in order to ensure that a pipeline will make use of connections
# from different nodes
assert r.keyslot("a") != r.keyslot("b")
Expand All @@ -3000,7 +3002,13 @@ def raise_error(target_node, *args, **kwargs):

get_connection.side_effect = raise_error

r.pipeline().get("a").get("b").execute()
with patch.object(NodesManager, "initialize") as i:
# in order to remove disconnect caused by initialize
i.side_effect = lambda: None

with patch.object(Connection, "disconnect") as d:
r.pipeline().get("a").get("b").execute()
assert d.call_count == 0

# there should have been two get_connections per execution and
# two executions due to exception raised in the first execution
Expand All @@ -3010,6 +3018,39 @@ def raise_error(target_node, *args, **kwargs):
num_of_conns = len(connection_pool._available_connections)
assert num_of_conns == connection_pool._created_connections

@pytest.mark.parametrize("error", [RedisClusterException, BaseException])
def test_return_previous_acquired_connections_without_retry(self, r, error):
# in order to ensure that a pipeline will make use of connections
# from different nodes
assert r.keyslot("a") != r.keyslot("b")

orig_func = redis.cluster.get_connection
with patch("redis.cluster.get_connection") as get_connection:

def raise_error(target_node, *args, **kwargs):
if get_connection.call_count == 2:
raise error("mocked error")
else:
return orig_func(target_node, *args, **kwargs)

get_connection.side_effect = raise_error

with patch.object(Connection, "disconnect") as d:
with pytest.raises(error):
r.pipeline().get("a").get("b").execute()
assert d.call_count == 0

# there should have been two get_connections per execution and
# two executions due to exception raised in the first execution
assert get_connection.call_count == 2
for cluster_node in r.nodes_manager.nodes_cache.values():
connection_pool = cluster_node.redis_connection.connection_pool
num_of_conns = len(connection_pool._available_connections)
assert num_of_conns == connection_pool._created_connections
# connection must remain connected
for conn in connection_pool._available_connections:
assert conn._sock is not None

def test_empty_stack(self, r):
"""
If pipeline is executed with no commands it should
Expand Down
12 changes: 12 additions & 0 deletions tests/test_retry.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
BusyLoadingError,
ConnectionError,
ReadOnlyError,
RedisClusterException,
RedisError,
TimeoutError,
)
from redis.retry import Retry
Expand Down Expand Up @@ -122,6 +124,16 @@ def test_infinite_retry(self):
assert self.actual_attempts == 5
assert self.actual_failures == 5

@pytest.mark.parametrize("exception_class", [ConnectionError, TimeoutError])
def test_is_supported_error_true(self, exception_class):
retry = Retry(BackoffMock(), -1)
assert retry.is_supported_error(exception_class())

@pytest.mark.parametrize("exception_class", [RedisClusterException, RedisError])
def test_is_supported_error_false(self, exception_class):
retry = Retry(BackoffMock(), -1)
assert not retry.is_supported_error(exception_class())


@pytest.mark.onlynoncluster
class TestRedisClientRetry:
Expand Down

0 comments on commit 68ef50a

Please sign in to comment.