Skip to content

Commit

Permalink
review(sami): split up the background_task of the connection manager
Browse files Browse the repository at this point in the history
  • Loading branch information
mariocynicys committed Oct 20, 2024
1 parent 2457b96 commit 24a64e3
Showing 1 changed file with 98 additions and 90 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -334,112 +334,120 @@ impl ConnectionManager {
async fn background_task(self) {
// Take out the min_connected notifiee from the manager.
let mut min_connected_notification = unwrap_or_return!(self.extract_below_min_connected_notifiee());
// A flag to indicate whether we are in panic mode due to failing to maintain the minimum number of connections.
// If so, we shouldn't log a lot of errors to not flood the logs.
let mut retrying_endlessly = false;
// A flag to indicate whether to log connection establishment errors or not. We should not log them if we
// are in panic mode (i.e. we are below the `min_connected` threshold) as this will flood the error log.
let mut log_errors = true;
loop {
// Get the candidate connections that we will consider maintaining.
let (will_never_get_min_connected, mut candidate_connections) = {
let all_connections = self.read_connections();
let maintained_connections = self.read_maintained_connections();
// The number of connections we need to add as maintained to reach the `min_connected` threshold.
let connections_needed = self.config().min_connected.saturating_sub(maintained_connections.len());
// The connections that we can consider (all connections - candidate connections).
let all_candidate_connections: Vec<_> = all_connections
.iter()
.filter_map(|(_, conn_ctx)| {
(!maintained_connections.contains_key(&conn_ctx.id))
.then(|| (conn_ctx.connection.clone(), conn_ctx.id))
})
.collect();
// The candidate connections from above, but further filtered by whether they are suspended or not.
let non_suspended_candidate_connections: Vec<_> = all_candidate_connections
.iter()
.filter(|(connection, _)| {
all_connections
.get(connection.address())
.map_or(false, |conn_ctx| now_ms() > conn_ctx.suspended_till())
})
.cloned()
.collect();
// Decide which candidate connections to consider (all or only non-suspended).
if connections_needed > non_suspended_candidate_connections.len() {
if connections_needed > all_candidate_connections.len() {
// Not enough connections to cover the `min_connected` threshold.
// This means we will never be able to maintain `min_connected` active connections.
(true, all_candidate_connections)
} else {
// If we consider all candidate connection (but some are suspended), we can cover the needed connections.
// We will consider the suspended ones since if we don't we will stay below `min_connected` threshold.
(false, all_candidate_connections)
}
} else {
// Non suspended candidates are enough to cover the needed connections.
(false, non_suspended_candidate_connections)
}
};

let (candidate_connections, will_never_get_min_connected) = self.get_candidate_connections();
// Establish the connections to the selected candidates and alter the maintained connections set accordingly.
{
let client = unwrap_or_return!(self.get_client());
// Sort the candidate connections by their priority/ID.
candidate_connections.sort_by_key(|(_, priority)| *priority);
for (connection, connection_id) in candidate_connections {
let address = connection.address().to_string();
let (maintained_connections_size, lowest_priority_connection_id) = {
let maintained_connections = self.read_maintained_connections();
let maintained_connections_size = maintained_connections.len();
let lowest_priority_connection_id =
*maintained_connections.keys().next_back().unwrap_or(&u32::MAX);
(maintained_connections_size, lowest_priority_connection_id)
};

// We can only try to add the connection if:
// 1- We haven't reached the `max_connected` threshold.
// 2- We have reached the `max_connected` threshold but the connection has a higher priority than the lowest priority connection.
if maintained_connections_size < self.config().max_connected
|| connection_id < lowest_priority_connection_id
{
// Now that we know the connection is good to be inserted, try to establish it.
if let Err(e) = connection.establish_connection_loop(client.clone()).await {
if !retrying_endlessly {
error!("Failed to establish connection to {address} due to error: {e:?}");
}
// Remove the connection if it's not recoverable.
if !e.is_recoverable() {
self.remove_connection(&address).ok();
}
continue;
}
self.maintain(connection_id, address);
} else {
// If any of the two conditions on the `if` statement above are not met, there is nothing to do.
// At this point we have already collected `max_connected` connections and also the current connection
// in the candidate list has a lower priority than the lowest priority maintained connection, and the next
// candidate connections as well since they are sorted by priority.
break;
}
}
}

self.establish_best_connections(candidate_connections, log_errors).await;
// Only sleep if we successfully acquired the minimum number of connections,
// or if we know we can never maintain `min_connected` connections; there is no point of infinite non-wait looping then.
if self.read_maintained_connections().len() >= self.config().min_connected
|| will_never_get_min_connected
{
if self.read_maintained_connections().len() >= self.config().min_connected || will_never_get_min_connected {
// Wait for a timeout or a below `min_connected` notification before doing another round of house keeping.
futures::select! {
_ = Timer::sleep(BACKGROUND_TASK_WAIT_TIMEOUT).fuse() => (),
_ = min_connected_notification.wait().fuse() => (),
}
retrying_endlessly = false;
log_errors = true;
} else {
// Never sleeping can result in busy waiting, which is problematic as it might not
// give a chance to other tasks to make progress, especially in single threaded environments.
// Yield the execution to the executor to give a chance to other tasks to run.
// TODO: `yield` keyword is not supported in the current rust version, using a short sleep for now.
Timer::sleep(1.).await;
retrying_endlessly = true;
log_errors = false;
}
}
}

/// Returns a list of candidate connections that aren't maintained and could be considered for maintaining.
///
/// Also returns a flag indicating whether covering `min_connected` connections is even possible: not possible when
/// `min_connected` is greater than the number of connections we have.
fn get_candidate_connections(&self) -> (Vec<(Arc<ElectrumConnection>, u32)>, bool) {
let all_connections = self.read_connections();
let maintained_connections = self.read_maintained_connections();
// The number of connections we need to add as maintained to reach the `min_connected` threshold.
let connections_needed = self.config().min_connected.saturating_sub(maintained_connections.len());
// The connections that we can consider (all connections - candidate connections).
let all_candidate_connections: Vec<_> = all_connections
.iter()
.filter_map(|(_, conn_ctx)| {
(!maintained_connections.contains_key(&conn_ctx.id)).then(|| (conn_ctx.connection.clone(), conn_ctx.id))
})
.collect();
// The candidate connections from above, but further filtered by whether they are suspended or not.
let non_suspended_candidate_connections: Vec<_> = all_candidate_connections
.iter()
.filter(|(connection, _)| {
all_connections
.get(connection.address())
.map_or(false, |conn_ctx| now_ms() > conn_ctx.suspended_till())
})
.cloned()
.collect();
// Decide which candidate connections to consider (all or only non-suspended).
if connections_needed > non_suspended_candidate_connections.len() {
if connections_needed > all_candidate_connections.len() {
// Not enough connections to cover the `min_connected` threshold.
// This means we will never be able to maintain `min_connected` active connections.
(all_candidate_connections, true)
} else {
// If we consider all candidate connection (but some are suspended), we can cover the needed connections.
// We will consider the suspended ones since if we don't we will stay below `min_connected` threshold.
(all_candidate_connections, false)
}
} else {
// Non suspended candidates are enough to cover the needed connections.
(non_suspended_candidate_connections, false)
}
}

/// Establishes the best connections (based on priority) using the candidate connections
/// till we can't establish no more (hit the `max_connected` threshold).
async fn establish_best_connections(
&self,
mut candidate_connections: Vec<(Arc<ElectrumConnection>, u32)>,
log_errors: bool,
) {
let client = unwrap_or_return!(self.get_client());
// Sort the candidate connections by their priority/ID.
candidate_connections.sort_by_key(|(_, priority)| *priority);
for (connection, connection_id) in candidate_connections {
let address = connection.address().to_string();
let (maintained_connections_size, lowest_priority_connection_id) = {
let maintained_connections = self.read_maintained_connections();
let maintained_connections_size = maintained_connections.len();
let lowest_priority_connection_id = *maintained_connections.keys().next_back().unwrap_or(&u32::MAX);
(maintained_connections_size, lowest_priority_connection_id)
};

// We can only try to add the connection if:
// 1- We haven't reached the `max_connected` threshold.
// 2- We have reached the `max_connected` threshold but the connection has a higher priority than the lowest priority connection.
if maintained_connections_size < self.config().max_connected
|| connection_id < lowest_priority_connection_id
{
// Now that we know the connection is good to be inserted, try to establish it.
if let Err(e) = connection.establish_connection_loop(client.clone()).await {
if log_errors {
error!("Failed to establish connection to {address} due to error: {e:?}");
}
// Remove the connection if it's not recoverable.
if !e.is_recoverable() {
self.remove_connection(&address).ok();
}
continue;
}
self.maintain(connection_id, address);
} else {
// If any of the two conditions on the `if` statement above are not met, there is nothing to do.
// At this point we have already collected `max_connected` connections and also the current connection
// in the candidate list has a lower priority than the lowest priority maintained connection, and the next
// candidate connections as well since they are sorted by priority.
break;
}
}
}
Expand Down

0 comments on commit 24a64e3

Please sign in to comment.