Skip to content

Commit

Permalink
release already acquired connections on ClusterPipeline, when get_con…
Browse files Browse the repository at this point in the history
…nection raises an exception (#3133)

Signed-off-by: zach.lee <[email protected]>
  • Loading branch information
zakaf authored and dvora-h committed Feb 25, 2024
1 parent 3f23fff commit 101a28d
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 0 deletions.
2 changes: 2 additions & 0 deletions redis/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -2130,6 +2130,8 @@ def _send_cluster_commands(
try:
connection = get_connection(redis_node, c.args)
except ConnectionError:
for n in nodes.values():
n.connection_pool.release(n.connection)
# Connection retries are being handled in the node's
# Retry object. Reinitialize the node -> slot table.
self.nodes_manager.initialize()
Expand Down
26 changes: 26 additions & 0 deletions tests/test_cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from unittest.mock import DEFAULT, Mock, call, patch

import pytest
import redis
from redis import Redis
from redis._parsers import CommandsParser
from redis.backoff import ExponentialBackoff, NoBackoff, default_backoff
Expand Down Expand Up @@ -3247,6 +3248,31 @@ def raise_ask_error():
assert ask_node.redis_connection.connection.read_response.called
assert res == ["MOCK_OK"]

def test_return_previously_acquired_connections(self, r):
# in order to ensure that a pipeline will make use of connections
# from different nodes
assert r.keyslot("a") != r.keyslot("b")

orig_func = redis.cluster.get_connection
with patch("redis.cluster.get_connection") as get_connection:

def raise_error(target_node, *args, **kwargs):
if get_connection.call_count == 2:
raise ConnectionError("mocked error")
else:
return orig_func(target_node, *args, **kwargs)

get_connection.side_effect = raise_error

r.pipeline().get("a").get("b").execute()

# 4 = 2 get_connections per execution * 2 executions
assert get_connection.call_count == 4
for cluster_node in r.nodes_manager.nodes_cache.values():
connection_pool = cluster_node.redis_connection.connection_pool
num_of_conns = len(connection_pool._available_connections)
assert num_of_conns == connection_pool._created_connections

def test_empty_stack(self, r):
"""
If pipeline is executed with no commands it should
Expand Down

0 comments on commit 101a28d

Please sign in to comment.