diff --git a/mm2src/coins/utxo/rpc_clients/electrum_rpc/connection_manager/manager.rs b/mm2src/coins/utxo/rpc_clients/electrum_rpc/connection_manager/manager.rs index 13ffc3b381..b06628fd60 100644 --- a/mm2src/coins/utxo/rpc_clients/electrum_rpc/connection_manager/manager.rs +++ b/mm2src/coins/utxo/rpc_clients/electrum_rpc/connection_manager/manager.rs @@ -334,112 +334,120 @@ impl ConnectionManager { async fn background_task(self) { // Take out the min_connected notifiee from the manager. let mut min_connected_notification = unwrap_or_return!(self.extract_below_min_connected_notifiee()); - // A flag to indicate whether we are in panic mode due to failing to maintain the minimum number of connections. - // If so, we shouldn't log a lot of errors to not flood the logs. - let mut retrying_endlessly = false; + // A flag to indicate whether to log connection establishment errors or not. We should not log them if we + // are in panic mode (i.e. we are below the `min_connected` threshold) as this will flood the error log. + let mut log_errors = true; loop { // Get the candidate connections that we will consider maintaining. - let (will_never_get_min_connected, mut candidate_connections) = { - let all_connections = self.read_connections(); - let maintained_connections = self.read_maintained_connections(); - // The number of connections we need to add as maintained to reach the `min_connected` threshold. - let connections_needed = self.config().min_connected.saturating_sub(maintained_connections.len()); - // The connections that we can consider (all connections - candidate connections). - let all_candidate_connections: Vec<_> = all_connections - .iter() - .filter_map(|(_, conn_ctx)| { - (!maintained_connections.contains_key(&conn_ctx.id)) - .then(|| (conn_ctx.connection.clone(), conn_ctx.id)) - }) - .collect(); - // The candidate connections from above, but further filtered by whether they are suspended or not. - let non_suspended_candidate_connections: Vec<_> = all_candidate_connections - .iter() - .filter(|(connection, _)| { - all_connections - .get(connection.address()) - .map_or(false, |conn_ctx| now_ms() > conn_ctx.suspended_till()) - }) - .cloned() - .collect(); - // Decide which candidate connections to consider (all or only non-suspended). - if connections_needed > non_suspended_candidate_connections.len() { - if connections_needed > all_candidate_connections.len() { - // Not enough connections to cover the `min_connected` threshold. - // This means we will never be able to maintain `min_connected` active connections. - (true, all_candidate_connections) - } else { - // If we consider all candidate connection (but some are suspended), we can cover the needed connections. - // We will consider the suspended ones since if we don't we will stay below `min_connected` threshold. - (false, all_candidate_connections) - } - } else { - // Non suspended candidates are enough to cover the needed connections. - (false, non_suspended_candidate_connections) - } - }; - + let (candidate_connections, will_never_get_min_connected) = self.get_candidate_connections(); // Establish the connections to the selected candidates and alter the maintained connections set accordingly. - { - let client = unwrap_or_return!(self.get_client()); - // Sort the candidate connections by their priority/ID. - candidate_connections.sort_by_key(|(_, priority)| *priority); - for (connection, connection_id) in candidate_connections { - let address = connection.address().to_string(); - let (maintained_connections_size, lowest_priority_connection_id) = { - let maintained_connections = self.read_maintained_connections(); - let maintained_connections_size = maintained_connections.len(); - let lowest_priority_connection_id = - *maintained_connections.keys().next_back().unwrap_or(&u32::MAX); - (maintained_connections_size, lowest_priority_connection_id) - }; - - // We can only try to add the connection if: - // 1- We haven't reached the `max_connected` threshold. - // 2- We have reached the `max_connected` threshold but the connection has a higher priority than the lowest priority connection. - if maintained_connections_size < self.config().max_connected - || connection_id < lowest_priority_connection_id - { - // Now that we know the connection is good to be inserted, try to establish it. - if let Err(e) = connection.establish_connection_loop(client.clone()).await { - if !retrying_endlessly { - error!("Failed to establish connection to {address} due to error: {e:?}"); - } - // Remove the connection if it's not recoverable. - if !e.is_recoverable() { - self.remove_connection(&address).ok(); - } - continue; - } - self.maintain(connection_id, address); - } else { - // If any of the two conditions on the `if` statement above are not met, there is nothing to do. - // At this point we have already collected `max_connected` connections and also the current connection - // in the candidate list has a lower priority than the lowest priority maintained connection, and the next - // candidate connections as well since they are sorted by priority. - break; - } - } - } - + self.establish_best_connections(candidate_connections, log_errors).await; // Only sleep if we successfully acquired the minimum number of connections, // or if we know we can never maintain `min_connected` connections; there is no point of infinite non-wait looping then. - if self.read_maintained_connections().len() >= self.config().min_connected - || will_never_get_min_connected - { + if self.read_maintained_connections().len() >= self.config().min_connected || will_never_get_min_connected { // Wait for a timeout or a below `min_connected` notification before doing another round of house keeping. futures::select! { _ = Timer::sleep(BACKGROUND_TASK_WAIT_TIMEOUT).fuse() => (), _ = min_connected_notification.wait().fuse() => (), } - retrying_endlessly = false; + log_errors = true; } else { // Never sleeping can result in busy waiting, which is problematic as it might not // give a chance to other tasks to make progress, especially in single threaded environments. // Yield the execution to the executor to give a chance to other tasks to run. // TODO: `yield` keyword is not supported in the current rust version, using a short sleep for now. Timer::sleep(1.).await; - retrying_endlessly = true; + log_errors = false; + } + } + } + + /// Returns a list of candidate connections that aren't maintained and could be considered for maintaining. + /// + /// Also returns a flag indicating whether covering `min_connected` connections is even possible: not possible when + /// `min_connected` is greater than the number of connections we have. + fn get_candidate_connections(&self) -> (Vec<(Arc, u32)>, bool) { + let all_connections = self.read_connections(); + let maintained_connections = self.read_maintained_connections(); + // The number of connections we need to add as maintained to reach the `min_connected` threshold. + let connections_needed = self.config().min_connected.saturating_sub(maintained_connections.len()); + // The connections that we can consider (all connections - candidate connections). + let all_candidate_connections: Vec<_> = all_connections + .iter() + .filter_map(|(_, conn_ctx)| { + (!maintained_connections.contains_key(&conn_ctx.id)).then(|| (conn_ctx.connection.clone(), conn_ctx.id)) + }) + .collect(); + // The candidate connections from above, but further filtered by whether they are suspended or not. + let non_suspended_candidate_connections: Vec<_> = all_candidate_connections + .iter() + .filter(|(connection, _)| { + all_connections + .get(connection.address()) + .map_or(false, |conn_ctx| now_ms() > conn_ctx.suspended_till()) + }) + .cloned() + .collect(); + // Decide which candidate connections to consider (all or only non-suspended). + if connections_needed > non_suspended_candidate_connections.len() { + if connections_needed > all_candidate_connections.len() { + // Not enough connections to cover the `min_connected` threshold. + // This means we will never be able to maintain `min_connected` active connections. + (all_candidate_connections, true) + } else { + // If we consider all candidate connection (but some are suspended), we can cover the needed connections. + // We will consider the suspended ones since if we don't we will stay below `min_connected` threshold. + (all_candidate_connections, false) + } + } else { + // Non suspended candidates are enough to cover the needed connections. + (non_suspended_candidate_connections, false) + } + } + + /// Establishes the best connections (based on priority) using the candidate connections + /// till we can't establish no more (hit the `max_connected` threshold). + async fn establish_best_connections( + &self, + mut candidate_connections: Vec<(Arc, u32)>, + log_errors: bool, + ) { + let client = unwrap_or_return!(self.get_client()); + // Sort the candidate connections by their priority/ID. + candidate_connections.sort_by_key(|(_, priority)| *priority); + for (connection, connection_id) in candidate_connections { + let address = connection.address().to_string(); + let (maintained_connections_size, lowest_priority_connection_id) = { + let maintained_connections = self.read_maintained_connections(); + let maintained_connections_size = maintained_connections.len(); + let lowest_priority_connection_id = *maintained_connections.keys().next_back().unwrap_or(&u32::MAX); + (maintained_connections_size, lowest_priority_connection_id) + }; + + // We can only try to add the connection if: + // 1- We haven't reached the `max_connected` threshold. + // 2- We have reached the `max_connected` threshold but the connection has a higher priority than the lowest priority connection. + if maintained_connections_size < self.config().max_connected + || connection_id < lowest_priority_connection_id + { + // Now that we know the connection is good to be inserted, try to establish it. + if let Err(e) = connection.establish_connection_loop(client.clone()).await { + if log_errors { + error!("Failed to establish connection to {address} due to error: {e:?}"); + } + // Remove the connection if it's not recoverable. + if !e.is_recoverable() { + self.remove_connection(&address).ok(); + } + continue; + } + self.maintain(connection_id, address); + } else { + // If any of the two conditions on the `if` statement above are not met, there is nothing to do. + // At this point we have already collected `max_connected` connections and also the current connection + // in the candidate list has a lower priority than the lowest priority maintained connection, and the next + // candidate connections as well since they are sorted by priority. + break; } } }