Skip to content

Commit

Permalink
Reuse the old nodes' connections when a cluster topology refresh is b…
Browse files Browse the repository at this point in the history
…eing done (#2235)

* A fix was made to reuse the old nodes' connections when a cluster topology refresh is being done

* Fixed RedisCluster to immediately raise AuthenticationError

* Updated CHANGES

* Fixed cluster async bgsave test to ignore "bgsave already in progress" error

* Fixed linters
  • Loading branch information
barshaul authored Jun 23, 2022
1 parent 23fd327 commit 6da8086
Show file tree
Hide file tree
Showing 4 changed files with 75 additions and 14 deletions.
2 changes: 2 additions & 0 deletions CHANGES
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
* Fix broken connection writer lock-up for asyncio (#2065)
* Fix auth bug when provided with no username (#2086)
* Fix missing ClusterPipeline._lock (#2189)
* Fix reusing the old nodes' connections when cluster topology refresh is being done
* Fix RedisCluster to immediately raise AuthenticationError without a retry

* 4.1.3 (Feb 8, 2022)
* Fix flushdb and flushall (#1926)
Expand Down
37 changes: 26 additions & 11 deletions redis/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
from redis.crc import REDIS_CLUSTER_HASH_SLOTS, key_slot
from redis.exceptions import (
AskError,
AuthenticationError,
BusyLoadingError,
ClusterCrossSlotError,
ClusterDownError,
Expand Down Expand Up @@ -1113,7 +1114,7 @@ def _execute_command(self, target_node, *args, **kwargs):
)
return response

except (RedisClusterException, BusyLoadingError) as e:
except (RedisClusterException, BusyLoadingError, AuthenticationError) as e:
log.exception(type(e))
raise
except (ConnectionError, TimeoutError) as e:
Expand All @@ -1134,6 +1135,7 @@ def _execute_command(self, target_node, *args, **kwargs):
else:
# Hard force of reinitialize of the node/slots setup
# and try again with the new setup
target_node.redis_connection = None
self.nodes_manager.initialize()
raise
except MovedError as e:
Expand Down Expand Up @@ -1443,6 +1445,21 @@ def create_redis_node(self, host, port, **kwargs):
r = Redis(host=host, port=port, **kwargs)
return r

def _get_or_create_cluster_node(self, host, port, role, tmp_nodes_cache):
node_name = get_node_name(host, port)
# check if we already have this node in the tmp_nodes_cache
target_node = tmp_nodes_cache.get(node_name)
if target_node is None:
# before creating a new cluster node, check if the cluster node already
# exists in the current nodes cache and has a valid connection so we can
# reuse it
target_node = self.nodes_cache.get(node_name)
if target_node is None or target_node.redis_connection is None:
# create new cluster node for this cluster
target_node = ClusterNode(host, port, role)

return target_node

def initialize(self):
"""
Initializes the nodes cache, slots cache and redis connections.
Expand Down Expand Up @@ -1521,14 +1538,14 @@ def initialize(self):

for slot in cluster_slots:
primary_node = slot[2]
host = primary_node[0]
host = str_if_bytes(primary_node[0])
if host == "":
host = startup_node.host
port = int(primary_node[1])

target_node = tmp_nodes_cache.get(get_node_name(host, port))
if target_node is None:
target_node = ClusterNode(host, port, PRIMARY)
target_node = self._get_or_create_cluster_node(
host, port, PRIMARY, tmp_nodes_cache
)
# add this node to the nodes cache
tmp_nodes_cache[target_node.name] = target_node

Expand All @@ -1539,14 +1556,12 @@ def initialize(self):
replica_nodes = [slot[j] for j in range(3, len(slot))]

for replica_node in replica_nodes:
host = replica_node[0]
host = str_if_bytes(replica_node[0])
port = replica_node[1]

target_replica_node = tmp_nodes_cache.get(
get_node_name(host, port)
target_replica_node = self._get_or_create_cluster_node(
host, port, REPLICA, tmp_nodes_cache
)
if target_replica_node is None:
target_replica_node = ClusterNode(host, port, REPLICA)
tmp_slots[i].append(target_replica_node)
# add this node to the nodes cache
tmp_nodes_cache[
Expand Down Expand Up @@ -1598,7 +1613,7 @@ def initialize(self):
# Set the default node
self.default_node = self.get_nodes_by_server_type(PRIMARY)[0]
# Populate the startup nodes with all discovered nodes
self.populate_startup_nodes(self.nodes_cache.values())
self.startup_nodes = tmp_nodes_cache
# If initialize was called after a MovedError, clear it
self._moved_exception = None

Expand Down
10 changes: 7 additions & 3 deletions tests/test_asyncio/test_cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -1059,9 +1059,13 @@ async def test_readwrite(self) -> None:

@skip_if_redis_enterprise()
async def test_bgsave(self, r: RedisCluster) -> None:
assert await r.bgsave()
await asyncio.sleep(0.3)
assert await r.bgsave(True)
try:
assert await r.bgsave()
await asyncio.sleep(0.3)
assert await r.bgsave(True)
except ResponseError as e:
if "Background save already in progress" not in e.__str__():
raise

async def test_info(self, r: RedisCluster) -> None:
# Map keys to same slot
Expand Down
40 changes: 40 additions & 0 deletions tests/test_cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
RedisClusterException,
RedisError,
ResponseError,
TimeoutError,
)
from redis.utils import str_if_bytes
from tests.test_pubsub import wait_for_message
Expand Down Expand Up @@ -651,6 +652,45 @@ def test_not_require_full_coverage_cluster_down_error(self, r):
else:
raise e

def test_timeout_error_topology_refresh_reuse_connections(self, r):
"""
By mucking TIMEOUT errors, we'll force the cluster topology to be reinitialized,
and then ensure that only the impacted connection is replaced
"""
node = r.get_node_from_key("key")
r.set("key", "value")
node_conn_origin = {}
for n in r.get_nodes():
node_conn_origin[n.name] = n.redis_connection
real_func = r.get_redis_connection(node).parse_response

class counter:
def __init__(self, val=0):
self.val = int(val)

count = counter(0)
with patch.object(Redis, "parse_response") as parse_response:

def moved_redirect_effect(connection, *args, **options):
# raise a timeout for 5 times so we'll need to reinitilize the topology
if count.val >= 5:
parse_response.side_effect = real_func
count.val += 1
raise TimeoutError()

parse_response.side_effect = moved_redirect_effect
assert r.get("key") == b"value"
for node_name, conn in node_conn_origin.items():
if node_name == node.name:
# The old redis connection of the timed out node should have been
# deleted and replaced
assert conn != r.get_redis_connection(node)
else:
# other nodes' redis connection should have been reused during the
# topology refresh
cur_node = r.get_node(node_name=node_name)
assert conn == r.get_redis_connection(cur_node)


@pytest.mark.onlycluster
class TestClusterRedisCommands:
Expand Down

0 comments on commit 6da8086

Please sign in to comment.