From 35c4775b2966c1ef76f621c3722462a4aabdf3fc Mon Sep 17 00:00:00 2001 From: barshaul Date: Fri, 13 Sep 2024 13:41:44 +0000 Subject: [PATCH 1/2] Changed refresh_connections to release the connections container lock while creating a new connection --- redis/src/cluster_async/mod.rs | 93 +++++++++++++++++++--------------- 1 file changed, 51 insertions(+), 42 deletions(-) diff --git a/redis/src/cluster_async/mod.rs b/redis/src/cluster_async/mod.rs index 965a05cf8..67baca3f1 100644 --- a/redis/src/cluster_async/mod.rs +++ b/redis/src/cluster_async/mod.rs @@ -1303,50 +1303,59 @@ where check_existing_conn: bool, ) { info!("Started refreshing connections to {:?}", addresses); - let connections_container = inner.conn_lock.read().await; - let cluster_params = &inner.cluster_params; - let subscriptions_by_address = &inner.subscriptions_by_address; - let glide_connection_optons = &inner.glide_connection_options; + let mut tasks = FuturesUnordered::new(); + let inner = inner.clone(); - stream::iter(addresses.into_iter()) - .fold( - &*connections_container, - |connections_container, address| async move { - let node_option = if check_existing_conn { - connections_container.remove_node(&address) - } else { - None - }; + for address in addresses.into_iter() { + let inner = inner.clone(); - // override subscriptions for this connection - let mut cluster_params = cluster_params.clone(); - let subs_guard = subscriptions_by_address.read().await; - cluster_params.pubsub_subscriptions = subs_guard.get(&address).cloned(); - drop(subs_guard); - let node = get_or_create_conn( - &address, - node_option, - &cluster_params, - conn_type, - glide_connection_optons.clone(), - ) - .await; - match node { - Ok(node) => { - connections_container - .replace_or_add_connection_for_address(address, node); - } - Err(err) => { - warn!( - "Failed to refresh connection for node {}. Error: `{:?}`", - address, err - ); - } - } - connections_container - }, - ) - .await; + tasks.push(async move { + let connections_container = inner.conn_lock.read().await; + let cluster_params = &inner.cluster_params; + let subscriptions_by_address = &inner.subscriptions_by_address; + let glide_connection_options = &inner.glide_connection_options; + + let node_option = if check_existing_conn { + connections_container.remove_node(&address) + } else { + None + }; + + // Override subscriptions for this connection + let mut cluster_params = cluster_params.clone(); + let subs_guard = subscriptions_by_address.read().await; + cluster_params.pubsub_subscriptions = subs_guard.get(&address).cloned(); + drop(subs_guard); + drop(connections_container); + + let node = get_or_create_conn( + &address, + node_option, + &cluster_params, + conn_type, + glide_connection_options.clone(), + ) + .await; + + (address, node) + }); + } + + // Poll connection tasks as soon as each one finishes + while let Some(result) = tasks.next().await { + match result { + (address, Ok(node)) => { + let connections_container = inner.conn_lock.read().await; + connections_container.replace_or_add_connection_for_address(address, node); + } + (address, Err(err)) => { + warn!( + "Failed to refresh connection for node {}. Error: `{:?}`", + address, err + ); + } + } + } info!("refresh connections completed"); } From ec85a7b2dc842126ea81865f8cb694f373e251d1 Mon Sep 17 00:00:00 2001 From: barshaul Date: Tue, 1 Oct 2024 15:03:33 +0000 Subject: [PATCH 2/2] Fixed PR comments --- redis/src/cluster_async/mod.rs | 15 +++++---------- 1 file changed, 5 insertions(+), 10 deletions(-) diff --git a/redis/src/cluster_async/mod.rs b/redis/src/cluster_async/mod.rs index 67baca3f1..d5dba5fe0 100644 --- a/redis/src/cluster_async/mod.rs +++ b/redis/src/cluster_async/mod.rs @@ -1310,30 +1310,25 @@ where let inner = inner.clone(); tasks.push(async move { - let connections_container = inner.conn_lock.read().await; - let cluster_params = &inner.cluster_params; - let subscriptions_by_address = &inner.subscriptions_by_address; - let glide_connection_options = &inner.glide_connection_options; - let node_option = if check_existing_conn { + let connections_container = inner.conn_lock.read().await; connections_container.remove_node(&address) } else { None }; // Override subscriptions for this connection - let mut cluster_params = cluster_params.clone(); - let subs_guard = subscriptions_by_address.read().await; + let mut cluster_params = inner.cluster_params.clone(); + let subs_guard = inner.subscriptions_by_address.read().await; cluster_params.pubsub_subscriptions = subs_guard.get(&address).cloned(); drop(subs_guard); - drop(connections_container); let node = get_or_create_conn( &address, node_option, &cluster_params, conn_type, - glide_connection_options.clone(), + inner.glide_connection_options.clone(), ) .await; @@ -1356,7 +1351,7 @@ where } } } - info!("refresh connections completed"); + debug!("refresh connections completed"); } async fn aggregate_results(