Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[GROW-3361] always release, instead of disconnect, when error occurs during get_connection #11

Merged
merged 3 commits into from
Aug 24, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 9 additions & 8 deletions redis/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -1103,7 +1103,7 @@ def execute_command(self, *args, **kwargs):
# The nodes and slots cache were reinitialized.
# Try again with the new cluster setup.
retry_attempts -= 1
if self.retry and isinstance(e, self.retry._supported_errors):
if self.retry and self.retry.is_supported_error(e):
backoff = self.retry._backoff.compute(
self.cluster_error_retry_attempts - retry_attempts
)
Expand Down Expand Up @@ -2034,20 +2034,19 @@ def _send_cluster_commands(
redis_node = self.get_redis_connection(node)
try:
connection = get_connection(redis_node, c.args)
except (ConnectionError, TimeoutError) as e:
except BaseException as e:
for n in nodes.values():
n.connection_pool.release(n.connection)
n.connection = None
nodes = {}
if self.retry and isinstance(
e, self.retry._supported_errors
):
if self.retry and self.retry.is_supported_error(e):
backoff = self.retry._backoff.compute(attempts_count)
if backoff > 0:
time.sleep(backoff)
self.nodes_manager.initialize()
if is_default_node:
self.replace_default_node()
if isinstance(e, (ConnectionError, TimeoutError)):
self.nodes_manager.initialize()
if is_default_node:
self.replace_default_node()
raise
nodes[node_name] = NodeCommands(
redis_node.parse_response,
Expand Down Expand Up @@ -2163,6 +2162,8 @@ def _send_cluster_commands(
if n.connection:
n.connection.disconnect()
n.connection_pool.release(n.connection)
if len(nodes) > 0:

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just for curious, Is there any reason for delaying after the disconnect the connections?

Copy link
Author

@zach-iee zach-iee Aug 23, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this code is a safety feature to catch all connection leaks, but it comes with a cost of disconnecting all connections used in this function. Ideally, this code block shouldn't be executed, but if executed due to an unknown error, it means that there probably is a problem with Soda or Redis. Reconnecting can put more load on a Redis node, so i thought static backoff could alleviate the load on Redis CPU

time.sleep(0.25)
raise

def _fail_on_redirect(self, allow_redirections):
Expand Down
3 changes: 3 additions & 0 deletions redis/retry.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@ def update_supported_errors(self, specified_errors: list):
set(self._supported_errors + tuple(specified_errors))
)

def is_supported_error(self, error):
return isinstance(error, self._supported_errors)

def call_with_retry(self, do, fail):
"""
Execute an operation that might fail and returns its result, or
Expand Down
49 changes: 45 additions & 4 deletions tests/test_cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -2805,8 +2805,10 @@ def raise_error():

m.side_effect = raise_error

with pytest.raises(Exception, match="unexpected error"):
r.pipeline().get("a").execute()
with patch.object(Connection, "disconnect") as d:
with pytest.raises(Exception, match="unexpected error"):
r.pipeline().get("a").execute()
assert d.call_count == 1

for cluster_node in r.nodes_manager.nodes_cache.values():
connection_pool = cluster_node.redis_connection.connection_pool
Expand Down Expand Up @@ -3127,7 +3129,7 @@ def raise_ask_error():
assert res == ["MOCK_OK"]

@pytest.mark.parametrize("error", [ConnectionError, TimeoutError])
def test_return_previous_acquired_connections(self, r, error):
def test_return_previous_acquired_connections_with_retry(self, r, error):
# in order to ensure that a pipeline will make use of connections
# from different nodes
assert r.keyslot("a") != r.keyslot("b")
Expand All @@ -3143,7 +3145,13 @@ def raise_error(target_node, *args, **kwargs):

get_connection.side_effect = raise_error

r.pipeline().get("a").get("b").execute()
with patch.object(NodesManager, "initialize") as i:
# in order to remove disconnect caused by initialize
i.side_effect = lambda: None

with patch.object(Connection, "disconnect") as d:
r.pipeline().get("a").get("b").execute()
assert d.call_count == 0

# there should have been two get_connections per execution and
# two executions due to exception raised in the first execution
Expand All @@ -3153,6 +3161,39 @@ def raise_error(target_node, *args, **kwargs):
num_of_conns = len(connection_pool._available_connections)
assert num_of_conns == connection_pool._created_connections

@pytest.mark.parametrize("error", [RedisClusterException, BaseException])
def test_return_previous_acquired_connections_without_retry(self, r, error):
# in order to ensure that a pipeline will make use of connections
# from different nodes
assert r.keyslot("a") != r.keyslot("b")

orig_func = redis.cluster.get_connection
with patch("redis.cluster.get_connection") as get_connection:

def raise_error(target_node, *args, **kwargs):
if get_connection.call_count == 2:
raise error("mocked error")
else:
return orig_func(target_node, *args, **kwargs)

get_connection.side_effect = raise_error

with patch.object(Connection, "disconnect") as d:
with pytest.raises(error):
r.pipeline().get("a").get("b").execute()
assert d.call_count == 0

# there should have been two get_connections per execution and
# two executions due to exception raised in the first execution
assert get_connection.call_count == 2
for cluster_node in r.nodes_manager.nodes_cache.values():
connection_pool = cluster_node.redis_connection.connection_pool
num_of_conns = len(connection_pool._available_connections)
assert num_of_conns == connection_pool._created_connections
# connection must remain connected
for conn in connection_pool._available_connections:
assert conn._sock is not None

def test_empty_stack(self, r):
"""
If pipeline is executed with no commands it should
Expand Down
12 changes: 12 additions & 0 deletions tests/test_retry.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
BusyLoadingError,
ConnectionError,
ReadOnlyError,
RedisClusterException,
RedisError,
TimeoutError,
)
from redis.retry import Retry
Expand Down Expand Up @@ -122,6 +124,16 @@ def test_infinite_retry(self):
assert self.actual_attempts == 5
assert self.actual_failures == 5

@pytest.mark.parametrize("exception_class", [ConnectionError, TimeoutError])
def test_is_supported_error_true(self, exception_class):
retry = Retry(BackoffMock(), -1)
assert retry.is_supported_error(exception_class())

@pytest.mark.parametrize("exception_class", [RedisClusterException, RedisError])
def test_is_supported_error_false(self, exception_class):
retry = Retry(BackoffMock(), -1)
assert not retry.is_supported_error(exception_class())


@pytest.mark.onlynoncluster
class TestRedisClientRetry:
Expand Down