-
Notifications
You must be signed in to change notification settings - Fork 16
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Release the read lock while creating connections inrefresh_connections
#191
Changes from 1 commit
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -1303,50 +1303,59 @@ where | |
check_existing_conn: bool, | ||
) { | ||
info!("Started refreshing connections to {:?}", addresses); | ||
let connections_container = inner.conn_lock.read().await; | ||
let cluster_params = &inner.cluster_params; | ||
let subscriptions_by_address = &inner.subscriptions_by_address; | ||
let glide_connection_optons = &inner.glide_connection_options; | ||
let mut tasks = FuturesUnordered::new(); | ||
let inner = inner.clone(); | ||
|
||
stream::iter(addresses.into_iter()) | ||
.fold( | ||
&*connections_container, | ||
|connections_container, address| async move { | ||
let node_option = if check_existing_conn { | ||
connections_container.remove_node(&address) | ||
} else { | ||
None | ||
}; | ||
for address in addresses.into_iter() { | ||
let inner = inner.clone(); | ||
|
||
// override subscriptions for this connection | ||
let mut cluster_params = cluster_params.clone(); | ||
let subs_guard = subscriptions_by_address.read().await; | ||
cluster_params.pubsub_subscriptions = subs_guard.get(&address).cloned(); | ||
drop(subs_guard); | ||
let node = get_or_create_conn( | ||
&address, | ||
node_option, | ||
&cluster_params, | ||
conn_type, | ||
glide_connection_optons.clone(), | ||
) | ||
.await; | ||
match node { | ||
Ok(node) => { | ||
connections_container | ||
.replace_or_add_connection_for_address(address, node); | ||
} | ||
Err(err) => { | ||
warn!( | ||
"Failed to refresh connection for node {}. Error: `{:?}`", | ||
address, err | ||
); | ||
} | ||
} | ||
connections_container | ||
}, | ||
) | ||
.await; | ||
tasks.push(async move { | ||
let connections_container = inner.conn_lock.read().await; | ||
let cluster_params = &inner.cluster_params; | ||
let subscriptions_by_address = &inner.subscriptions_by_address; | ||
let glide_connection_options = &inner.glide_connection_options; | ||
barshaul marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
let node_option = if check_existing_conn { | ||
connections_container.remove_node(&address) | ||
} else { | ||
None | ||
}; | ||
|
||
// Override subscriptions for this connection | ||
let mut cluster_params = cluster_params.clone(); | ||
let subs_guard = subscriptions_by_address.read().await; | ||
cluster_params.pubsub_subscriptions = subs_guard.get(&address).cloned(); | ||
drop(subs_guard); | ||
drop(connections_container); | ||
|
||
let node = get_or_create_conn( | ||
&address, | ||
node_option, | ||
&cluster_params, | ||
conn_type, | ||
glide_connection_options.clone(), | ||
barshaul marked this conversation as resolved.
Show resolved
Hide resolved
|
||
) | ||
.await; | ||
|
||
(address, node) | ||
}); | ||
} | ||
|
||
// Poll connection tasks as soon as each one finishes | ||
while let Some(result) = tasks.next().await { | ||
match result { | ||
(address, Ok(node)) => { | ||
let connections_container = inner.conn_lock.read().await; | ||
connections_container.replace_or_add_connection_for_address(address, node); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We should expose API for There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. same as above |
||
} | ||
(address, Err(err)) => { | ||
warn!( | ||
"Failed to refresh connection for node {}. Error: `{:?}`", | ||
address, err | ||
); | ||
} | ||
} | ||
} | ||
info!("refresh connections completed"); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is this something that happens often? if it does, please move this to There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. changed |
||
} | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Making a lock "public" is not a good idea. We should an atomic API and not the lock itself.
For example:
if the "something" is complex, we should add an API:
this way we have a full control over the lock and we can avoid misuse of the lock
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think that's a good idea, lets do it in a seperate PR