From fb64302de1531f02ba5e391ea2086b268b7a684f Mon Sep 17 00:00:00 2001 From: Josh Lind Date: Wed, 11 Sep 2024 20:30:26 -0400 Subject: [PATCH 1/4] [Consensus Observer] Make subscription creation asynchronous. --- .../src/consensus_observer/observer/mod.rs | 1 + .../observer/subscription.rs | 347 +------- .../observer/subscription_manager.rs | 555 ++++-------- .../observer/subscription_utils.rs | 823 ++++++++++++++++++ .../publisher/consensus_publisher.rs | 20 + 5 files changed, 1024 insertions(+), 722 deletions(-) create mode 100644 consensus/src/consensus_observer/observer/subscription_utils.rs diff --git a/consensus/src/consensus_observer/observer/mod.rs b/consensus/src/consensus_observer/observer/mod.rs index 35dd0ea2ec72e..4a4e5d42881a3 100644 --- a/consensus/src/consensus_observer/observer/mod.rs +++ b/consensus/src/consensus_observer/observer/mod.rs @@ -8,3 +8,4 @@ pub mod payload_store; pub mod pending_blocks; pub mod subscription; pub mod subscription_manager; +pub mod subscription_utils; diff --git a/consensus/src/consensus_observer/observer/subscription.rs b/consensus/src/consensus_observer/observer/subscription.rs index d3023da292d00..7b368fe3417c6 100644 --- a/consensus/src/consensus_observer/observer/subscription.rs +++ b/consensus/src/consensus_observer/observer/subscription.rs @@ -1,25 +1,17 @@ // Copyright © Aptos Foundation // SPDX-License-Identifier: Apache-2.0 -use crate::consensus_observer::common::{ - error::Error, - logging::{LogEntry, LogSchema}, -}; +use crate::consensus_observer::{common::error::Error, observer::subscription_utils}; use aptos_config::{config::ConsensusObserverConfig, network_id::PeerNetworkId}; -use aptos_logger::{info, warn}; -use aptos_network::{application::metadata::PeerMetadata, ProtocolId}; +use aptos_network::application::metadata::PeerMetadata; use aptos_storage_interface::DbReader; use aptos_time_service::{TimeService, TimeServiceTrait}; -use ordered_float::OrderedFloat; use std::{ - collections::{BTreeMap, HashMap, HashSet}, + collections::{HashMap, HashSet}, sync::Arc, time::{Duration, Instant}, }; -// A useful constant for representing the maximum ping latency -const MAX_PING_LATENCY_SECS: f64 = 10_000.0; - /// A single consensus observer subscription pub struct ConsensusObserverSubscription { // The configuration of the consensus observer @@ -106,7 +98,8 @@ impl ConsensusObserverSubscription { self.last_optimality_check_time_and_peers = (time_now, current_connected_peers); // Sort the peers by subscription optimality - let sorted_peers = sort_peers_by_subscription_optimality(peers_and_metadata); + let sorted_peers = + subscription_utils::sort_peers_by_subscription_optimality(peers_and_metadata); // Verify that this peer is one of the most optimal peers let max_concurrent_subscriptions = @@ -184,142 +177,17 @@ impl ConsensusObserverSubscription { Ok(()) } + /// Returns the peer network id of the subscription + pub fn get_peer_network_id(&self) -> PeerNetworkId { + self.peer_network_id + } + /// Updates the last message receive time to the current time pub fn update_last_message_receive_time(&mut self) { self.last_message_receive_time = self.time_service.now(); } } -/// Gets the distance from the validators for the specified peer from the peer metadata -fn get_distance_for_peer( - peer_network_id: &PeerNetworkId, - peer_metadata: &PeerMetadata, -) -> Option { - // Get the distance for the peer - let peer_monitoring_metadata = peer_metadata.get_peer_monitoring_metadata(); - let distance = peer_monitoring_metadata - .latest_network_info_response - .as_ref() - .map(|response| response.distance_from_validators); - - // If the distance is missing, log a warning - if distance.is_none() { - warn!( - LogSchema::new(LogEntry::ConsensusObserver).message(&format!( - "Unable to get distance for peer! Peer: {:?}", - peer_network_id - )) - ); - } - - distance -} - -/// Gets the latency for the specified peer from the peer metadata -fn get_latency_for_peer( - peer_network_id: &PeerNetworkId, - peer_metadata: &PeerMetadata, -) -> Option { - // Get the latency for the peer - let peer_monitoring_metadata = peer_metadata.get_peer_monitoring_metadata(); - let latency = peer_monitoring_metadata.average_ping_latency_secs; - - // If the latency is missing, log a warning - if latency.is_none() { - warn!( - LogSchema::new(LogEntry::ConsensusObserver).message(&format!( - "Unable to get latency for peer! Peer: {:?}", - peer_network_id - )) - ); - } - - latency -} - -/// Sorts the peers by subscription optimality (in descending order of -/// optimality). This requires: (i) sorting the peers by distance from the -/// validator set and ping latency (lower values are more optimal); and (ii) -/// filtering out peers that don't support consensus observer. -/// -/// Note: we prioritize distance over latency as we want to avoid close -/// but not up-to-date peers. If peers don't have sufficient metadata -/// for sorting, they are given a lower priority. -pub fn sort_peers_by_subscription_optimality( - peers_and_metadata: &HashMap, -) -> Vec { - // Group peers and latencies by validator distance, i.e., distance -> [(peer, latency)] - let mut unsupported_peers = Vec::new(); - let mut peers_and_latencies_by_distance = BTreeMap::new(); - for (peer_network_id, peer_metadata) in peers_and_metadata { - // Verify that the peer supports consensus observer - if !supports_consensus_observer(peer_metadata) { - unsupported_peers.push(*peer_network_id); - continue; // Skip the peer - } - - // Get the distance and latency for the peer - let distance = get_distance_for_peer(peer_network_id, peer_metadata); - let latency = get_latency_for_peer(peer_network_id, peer_metadata); - - // If the distance is not found, use the maximum distance - let distance = - distance.unwrap_or(aptos_peer_monitoring_service_types::MAX_DISTANCE_FROM_VALIDATORS); - - // If the latency is not found, use a large latency - let latency = latency.unwrap_or(MAX_PING_LATENCY_SECS); - - // Add the peer and latency to the distance group - peers_and_latencies_by_distance - .entry(distance) - .or_insert_with(Vec::new) - .push((*peer_network_id, OrderedFloat(latency))); - } - - // If there are peers that don't support consensus observer, log them - if !unsupported_peers.is_empty() { - info!( - LogSchema::new(LogEntry::ConsensusObserver).message(&format!( - "Found {} peers that don't support consensus observer! Peers: {:?}", - unsupported_peers.len(), - unsupported_peers - )) - ); - } - - // Sort the peers by distance and latency. Note: BTreeMaps are - // sorted by key, so the entries will be sorted by distance in ascending order. - let mut sorted_peers = Vec::new(); - for (_, mut peers_and_latencies) in peers_and_latencies_by_distance { - // Sort the peers by latency - peers_and_latencies.sort_by_key(|(_, latency)| *latency); - - // Add the peers to the sorted list (in sorted order) - sorted_peers.extend( - peers_and_latencies - .into_iter() - .map(|(peer_network_id, _)| peer_network_id), - ); - } - - // Log the sorted peers - info!( - LogSchema::new(LogEntry::ConsensusObserver).message(&format!( - "Sorted {} peers by subscription optimality! Peers: {:?}", - sorted_peers.len(), - sorted_peers - )) - ); - - sorted_peers -} - -/// Returns true iff the peer metadata indicates support for consensus observer -fn supports_consensus_observer(peer_metadata: &PeerMetadata) -> bool { - peer_metadata.supports_protocol(ProtocolId::ConsensusObserver) - && peer_metadata.supports_protocol(ProtocolId::ConsensusObserverRpc) -} - #[cfg(test)] mod test { use super::*; @@ -328,10 +196,9 @@ mod test { use aptos_network::{ protocols::wire::handshake::v1::{MessagingProtocolVersion, ProtocolIdSet}, transport::{ConnectionId, ConnectionMetadata}, + ProtocolId, }; - use aptos_peer_monitoring_service_types::{ - response::NetworkInformationResponse, PeerMonitoringMetadata, - }; + use aptos_peer_monitoring_service_types::PeerMonitoringMetadata; use aptos_storage_interface::Result; use aptos_types::{network_address::NetworkAddress, transaction::Version}; use claims::assert_matches; @@ -735,117 +602,6 @@ mod test { assert_eq!(subscription.last_message_receive_time, current_time); } - #[test] - fn test_sort_peers_by_distance_and_latency() { - // Sort an empty list of peers - let peers_and_metadata = HashMap::new(); - assert!(sort_peers_by_subscription_optimality(&peers_and_metadata).is_empty()); - - // Create a list of peers with empty metadata - let peers_and_metadata = create_peers_and_metadata(true, true, true, 10); - - // Sort the peers and verify the results - let sorted_peers = sort_peers_by_subscription_optimality(&peers_and_metadata); - assert_eq!(sorted_peers.len(), 10); - - // Create a list of peers with valid metadata - let peers_and_metadata = create_peers_and_metadata(false, false, true, 10); - - // Sort the peers - let sorted_peers = sort_peers_by_subscription_optimality(&peers_and_metadata); - - // Verify the order of the peers - verify_increasing_distance_latencies(&peers_and_metadata, &sorted_peers); - assert_eq!(sorted_peers.len(), 10); - - // Create a list of peers with and without metadata - let mut peers_and_metadata = create_peers_and_metadata(false, false, true, 10); - peers_and_metadata.extend(create_peers_and_metadata(true, false, true, 10)); - peers_and_metadata.extend(create_peers_and_metadata(false, true, true, 10)); - peers_and_metadata.extend(create_peers_and_metadata(true, true, true, 10)); - - // Sort the peers - let sorted_peers = sort_peers_by_subscription_optimality(&peers_and_metadata); - assert_eq!(sorted_peers.len(), 40); - - // Verify the order of the first 20 peers - let (first_20_peers, sorted_peers) = sorted_peers.split_at(20); - verify_increasing_distance_latencies(&peers_and_metadata, first_20_peers); - - // Verify that the next 10 peers only have latency metadata - let (next_10_peers, sorted_peers) = sorted_peers.split_at(10); - for sorted_peer in next_10_peers { - let peer_metadata = peers_and_metadata.get(sorted_peer).unwrap(); - assert!(get_distance_for_peer(sorted_peer, peer_metadata).is_none()); - assert!(get_latency_for_peer(sorted_peer, peer_metadata).is_some()); - } - - // Verify that the last 10 peers have no metadata - let (last_10_peers, remaining_peers) = sorted_peers.split_at(10); - for sorted_peer in last_10_peers { - let peer_metadata = peers_and_metadata.get(sorted_peer).unwrap(); - assert!(get_distance_for_peer(sorted_peer, peer_metadata).is_none()); - assert!(get_latency_for_peer(sorted_peer, peer_metadata).is_none()); - } - assert!(remaining_peers.is_empty()); - } - - #[test] - fn test_sort_peers_by_distance_and_latency_filter() { - // Sort an empty list of peers - let peers_and_metadata = HashMap::new(); - assert!(sort_peers_by_subscription_optimality(&peers_and_metadata).is_empty()); - - // Create a list of peers with empty metadata (with consensus observer support) - let peers_and_metadata = create_peers_and_metadata(true, true, true, 10); - - // Sort the peers and verify the results - let sorted_peers = sort_peers_by_subscription_optimality(&peers_and_metadata); - assert_eq!(sorted_peers.len(), 10); - - // Create a list of peers with empty metadata (without consensus observer support) - let peers_and_metadata = create_peers_and_metadata(true, true, false, 10); - - // Sort the peers and verify the results - let sorted_peers = sort_peers_by_subscription_optimality(&peers_and_metadata); - assert!(sorted_peers.is_empty()); - - // Create a list of peers with valid metadata (without consensus observer support) - let peers_and_metadata = create_peers_and_metadata(false, false, false, 10); - - // Sort the peers and verify the results - let sorted_peers = sort_peers_by_subscription_optimality(&peers_and_metadata); - assert!(sorted_peers.is_empty()); - - // Create a list of peers with empty metadata (with and without consensus observer support) - let mut peers_and_metadata = create_peers_and_metadata(true, true, true, 5); - peers_and_metadata.extend(create_peers_and_metadata(true, true, false, 50)); - - // Sort the peers and verify the results (only the supported peers are sorted) - let sorted_peers = sort_peers_by_subscription_optimality(&peers_and_metadata); - assert_eq!(sorted_peers.len(), 5); - - // Create a list of peers with valid metadata (with and without consensus observer support) - let mut peers_and_metadata = create_peers_and_metadata(false, false, true, 50); - peers_and_metadata.extend(create_peers_and_metadata(false, false, false, 10)); - - // Sort the peers and verify the results (only the supported peers are sorted) - let sorted_peers = sort_peers_by_subscription_optimality(&peers_and_metadata); - assert_eq!(sorted_peers.len(), 50); - - // Create a list of peers with valid metadata (with and without consensus observer support) - let supported_peer_and_metadata = create_peers_and_metadata(false, false, true, 1); - let unsupported_peer_and_metadata = create_peers_and_metadata(false, false, false, 1); - let mut peers_and_metadata = HashMap::new(); - peers_and_metadata.extend(supported_peer_and_metadata.clone()); - peers_and_metadata.extend(unsupported_peer_and_metadata); - - // Sort the peers and verify the results (only the supported peer is sorted) - let supported_peer = supported_peer_and_metadata.keys().next().unwrap(); - let sorted_peers = sort_peers_by_subscription_optimality(&peers_and_metadata); - assert_eq!(sorted_peers, vec![*supported_peer]); - } - /// Adds metadata for the specified peer to the map of peers and metadata fn add_metadata_for_peer( peers_and_metadata: &mut HashMap, @@ -901,85 +657,6 @@ mod test { } } - /// Creates a new peer and metadata for testing - fn create_peer_and_metadata( - latency: Option, - distance_from_validators: Option, - support_consensus_observer: bool, - ) -> (PeerNetworkId, PeerMetadata) { - // Create a random peer - let peer_network_id = PeerNetworkId::random(); - - // Create a new peer metadata with the given latency and distance - let connection_metadata = - create_connection_metadata(peer_network_id, support_consensus_observer); - let network_information_response = - distance_from_validators.map(|distance| NetworkInformationResponse { - connected_peers: BTreeMap::new(), - distance_from_validators: distance, - }); - let peer_monitoring_metadata = - PeerMonitoringMetadata::new(latency, None, network_information_response, None, None); - let peer_metadata = - PeerMetadata::new_for_test(connection_metadata, peer_monitoring_metadata); - - (peer_network_id, peer_metadata) - } - - /// Creates a list of peers and metadata for testing - fn create_peers_and_metadata( - empty_latency: bool, - empty_distance: bool, - support_consensus_observer: bool, - num_peers: u64, - ) -> HashMap { - let mut peers_and_metadata = HashMap::new(); - for i in 1..num_peers + 1 { - // Determine the distance for the peer - let distance = if empty_distance { None } else { Some(i) }; - - // Determine the latency for the peer - let latency = if empty_latency { None } else { Some(i as f64) }; - - // Create a new peer and metadata - let (peer_network_id, peer_metadata) = - create_peer_and_metadata(latency, distance, support_consensus_observer); - peers_and_metadata.insert(peer_network_id, peer_metadata); - } - peers_and_metadata - } - - /// Verifies that the distance and latencies for the peers are in - /// increasing order (with the distance taking precedence over the latency). - fn verify_increasing_distance_latencies( - peers_and_metadata: &HashMap, - sorted_peers: &[PeerNetworkId], - ) { - let mut previous_latency = None; - let mut previous_distance = 0; - for sorted_peer in sorted_peers { - // Get the distance and latency for the peer - let peer_metadata = peers_and_metadata.get(sorted_peer).unwrap(); - let distance = get_distance_for_peer(sorted_peer, peer_metadata).unwrap(); - let latency = get_latency_for_peer(sorted_peer, peer_metadata); - - // Verify the order of the peers - if distance == previous_distance { - if let Some(latency) = latency { - if let Some(previous_latency) = previous_latency { - assert!(latency >= previous_latency); - } - } - } else { - assert!(distance > previous_distance); - } - - // Update the previous latency and distance - previous_latency = latency; - previous_distance = distance; - } - } - /// Verifies that the last check time and peers are as expected fn verify_last_check_time_and_peers( subscription: &ConsensusObserverSubscription, diff --git a/consensus/src/consensus_observer/observer/subscription_manager.rs b/consensus/src/consensus_observer/observer/subscription_manager.rs index e63fdfc68fa23..16cd756c176dd 100644 --- a/consensus/src/consensus_observer/observer/subscription_manager.rs +++ b/consensus/src/consensus_observer/observer/subscription_manager.rs @@ -13,21 +13,27 @@ use crate::consensus_observer::{ ConsensusObserverMessage, ConsensusObserverRequest, ConsensusObserverResponse, }, }, - observer::{subscription, subscription::ConsensusObserverSubscription}, + observer::{subscription::ConsensusObserverSubscription, subscription_utils}, publisher::consensus_publisher::ConsensusPublisher, }; use aptos_config::{config::ConsensusObserverConfig, network_id::PeerNetworkId}; +use aptos_infallible::Mutex; use aptos_logger::{error, info, warn}; use aptos_network::application::{interface::NetworkClient, metadata::PeerMetadata}; use aptos_storage_interface::DbReader; use aptos_time_service::TimeService; use itertools::Itertools; use std::{collections::HashMap, sync::Arc}; +use tokio::task::JoinHandle; /// The manager for consensus observer subscriptions pub struct SubscriptionManager { // The currently active set of consensus observer subscriptions - active_observer_subscriptions: HashMap, + active_observer_subscriptions: + Arc>>, + + // The active subscription creation task (if one is currently running) + active_subscription_creation_task: Arc>>>, // The consensus observer client to send network messages consensus_observer_client: @@ -57,7 +63,8 @@ impl SubscriptionManager { time_service: TimeService, ) -> Self { Self { - active_observer_subscriptions: HashMap::new(), + active_observer_subscriptions: Arc::new(Mutex::new(HashMap::new())), + active_subscription_creation_task: Arc::new(Mutex::new(None)), consensus_observer_client, consensus_observer_config, consensus_publisher, @@ -73,7 +80,12 @@ impl SubscriptionManager { connected_peers_and_metadata: &HashMap, peer_network_id: PeerNetworkId, ) -> Result<(), Error> { - match self.active_observer_subscriptions.get_mut(&peer_network_id) { + // Get the active subscription for the peer + let mut active_observer_subscriptions = self.active_observer_subscriptions.lock(); + let active_subscription = active_observer_subscriptions.get_mut(&peer_network_id); + + // Check the health of the subscription + match active_subscription { Some(active_subscription) => { // Verify the peer is still connected if !connected_peers_and_metadata.contains_key(&peer_network_id) { @@ -121,210 +133,39 @@ impl SubscriptionManager { && num_terminated_subscriptions == initial_subscription_peers.len(); // Calculate the number of new subscriptions to create + let remaining_subscription_peers = self.get_active_subscription_peers(); let max_concurrent_subscriptions = self.consensus_observer_config.max_concurrent_subscriptions as usize; let num_subscriptions_to_create = - max_concurrent_subscriptions.saturating_sub(self.active_observer_subscriptions.len()); - - // Create the new subscriptions (if required) - let terminated_subscription_peers = terminated_subscriptions - .iter() - .map(|(peer, _)| *peer) - .collect(); - let new_subscription_peers = self - .create_new_subscriptions( - connected_peers_and_metadata, - num_subscriptions_to_create, - terminated_subscription_peers, - ) - .await; + max_concurrent_subscriptions.saturating_sub(remaining_subscription_peers.len()); - // Log a warning if we failed to create as many subscriptions as requested - let num_subscriptions_created = new_subscription_peers.len(); - if num_subscriptions_created < num_subscriptions_to_create { - warn!( - LogSchema::new(LogEntry::ConsensusObserver).message(&format!( - "Failed to create the requested number of subscriptions! Number of subscriptions \ - requested: {:?}, number of subscriptions created: {:?}.", - num_subscriptions_to_create, - num_subscriptions_created - )) - ); - } + // Update the total subscription metrics + update_total_subscription_metrics(&remaining_subscription_peers); - // Update the subscription metrics - self.update_subscription_metrics(&new_subscription_peers, terminated_subscriptions); + // Spawn a task to create the new subscriptions (asynchronously) + self.spawn_subscription_creation_task( + num_subscriptions_to_create, + remaining_subscription_peers, + terminated_subscriptions, + connected_peers_and_metadata, + ) + .await; // Return an error if all subscriptions were terminated if all_subscriptions_terminated { Err(Error::SubscriptionsReset(format!( - "All subscriptions were unhealthy and terminated! Number of terminated \ - subscriptions: {:?}, number of new subscriptions created: {:?}.", - num_terminated_subscriptions, num_subscriptions_created, + "All {:?} subscriptions were unhealthy and terminated!", + num_terminated_subscriptions, ))) } else { Ok(()) } } - /// Attempts to create the given number of new subscriptions - /// and returns the peer IDs of the newly created subscriptions. - /// Any `unhealthy_subscription_peers` are excluded from selection. - async fn create_new_subscriptions( - &mut self, - connected_peers_and_metadata: HashMap, - num_subscriptions_to_create: usize, - unhealthy_subscription_peers: Vec, - ) -> Vec { - // Return early if we don't need to create any new subscriptions - if num_subscriptions_to_create == 0 { - return vec![]; - } - - // Sort the potential peers for subscription requests - let mut sorted_potential_peers = match self.sort_peers_for_subscriptions( - connected_peers_and_metadata, - unhealthy_subscription_peers, - ) { - Some(sorted_peers) => sorted_peers, - None => { - error!(LogSchema::new(LogEntry::ConsensusObserver) - .message("Failed to sort peers for subscription requests!")); - return vec![]; - }, - }; - - // Verify that we have potential peers to subscribe to - if sorted_potential_peers.is_empty() { - warn!(LogSchema::new(LogEntry::ConsensusObserver) - .message("There are no potential peers to subscribe to!")); - return vec![]; - } - - // Go through the potential peers and attempt to create new subscriptions - let mut created_subscription_peers = vec![]; - for _ in 0..num_subscriptions_to_create { - // If there are no peers left to subscribe to, return early - if sorted_potential_peers.is_empty() { - info!( - LogSchema::new(LogEntry::ConsensusObserver).message(&format!( - "There are no more potential peers to subscribe to! \ - Num created subscriptions: {:?}", - created_subscription_peers.len() - )) - ); - break; - } - - // Attempt to create a subscription - let (subscription_peer, failed_subscription_peers) = self - .create_single_subscription(sorted_potential_peers.clone()) - .await; - - // Remove the failed peers from the sorted list - sorted_potential_peers.retain(|peer| !failed_subscription_peers.contains(peer)); - - // Process a successful subscription creation - if let Some(subscription_peer) = subscription_peer { - // Add the peer to the list of created subscriptions - created_subscription_peers.push(subscription_peer); - - // Remove the peer from the sorted list (for the next selection) - sorted_potential_peers.retain(|peer| peer != &subscription_peer); - } - } - - // Return the list of created subscriptions - created_subscription_peers - } - - /// Attempts to create a new subscription to a single peer from - /// the sorted list of potential peers. If a new subscription is - /// successfully created, the peer is returned. Likewise, any - /// peers with failed subscription attempts are also returned. - async fn create_single_subscription( - &mut self, - sorted_potential_peers: Vec, - ) -> (Option, Vec) { - let mut peers_with_failed_attempts = vec![]; - for potential_peer in sorted_potential_peers { - // Log the subscription attempt - info!( - LogSchema::new(LogEntry::ConsensusObserver).message(&format!( - "Attempting to subscribe to potential peer: {}!", - potential_peer - )) - ); - - // Send a subscription request to the peer and wait for the response. - // TODO: we should make this non-blocking! - let subscription_request = ConsensusObserverRequest::Subscribe; - let request_timeout_ms = self.consensus_observer_config.network_request_timeout_ms; - let response = self - .consensus_observer_client - .send_rpc_request_to_peer(&potential_peer, subscription_request, request_timeout_ms) - .await; - - // Process the response and update the active subscription - match response { - Ok(ConsensusObserverResponse::SubscribeAck) => { - // Log the successful subscription - info!( - LogSchema::new(LogEntry::ConsensusObserver).message(&format!( - "Successfully subscribed to peer: {}!", - potential_peer - )) - ); - - // Create the new subscription - let subscription = ConsensusObserverSubscription::new( - self.consensus_observer_config, - self.db_reader.clone(), - potential_peer, - self.time_service.clone(), - ); - - // Add the subscription to the active subscriptions - self.active_observer_subscriptions - .insert(potential_peer, subscription); - - // Return the successful subscription peer - return (Some(potential_peer), peers_with_failed_attempts); - }, - Ok(response) => { - // We received an invalid response - warn!( - LogSchema::new(LogEntry::ConsensusObserver).message(&format!( - "Got unexpected response type for subscription request: {:?}", - response.get_label() - )) - ); - - // Add the peer to the list of failed attempts - peers_with_failed_attempts.push(potential_peer); - }, - Err(error) => { - // We encountered an error while sending the request - error!( - LogSchema::new(LogEntry::ConsensusObserver).message(&format!( - "Failed to send subscription request to peer: {}! Error: {:?}", - potential_peer, error - )) - ); - - // Add the peer to the list of failed attempts - peers_with_failed_attempts.push(potential_peer); - }, - } - } - - // We failed to create a new subscription - (None, peers_with_failed_attempts) - } - /// Returns the currently active subscription peers fn get_active_subscription_peers(&self) -> Vec { - self.active_observer_subscriptions.keys().cloned().collect() + let active_observer_subscriptions = self.active_observer_subscriptions.lock(); + active_observer_subscriptions.keys().cloned().collect() } /// Gets the connected peers and metadata. If an error @@ -347,38 +188,89 @@ impl SubscriptionManager { }) } - /// Produces a list of sorted peers to service our subscription requests. - /// Note: if `unhealthy_subscription_peers` are provided, they will be excluded - /// from the selection process. Likewise, all peers currently subscribed to us - /// will be excluded from the selection process. - fn sort_peers_for_subscriptions( + /// Spawns a new subscription creation task to create + /// the specified number of new subscriptions. + async fn spawn_subscription_creation_task( &mut self, - mut connected_peers_and_metadata: HashMap, - unhealthy_subscription_peers: Vec, - ) -> Option> { - // Remove any peers we're already subscribed to - for active_subscription_peer in self.get_active_subscription_peers() { - let _ = connected_peers_and_metadata.remove(&active_subscription_peer); + num_subscriptions_to_create: usize, + active_subscription_peers: Vec, + terminated_subscriptions: Vec<(PeerNetworkId, Error)>, + connected_peers_and_metadata: HashMap, + ) { + // If there are no new subscriptions to create, return early + if num_subscriptions_to_create == 0 { + return; } - // Remove any unhealthy subscription peers - for unhealthy_peer in unhealthy_subscription_peers { - let _ = connected_peers_and_metadata.remove(&unhealthy_peer); + // If there is an active subscription creation task, return early + if let Some(subscription_creation_task) = &*self.active_subscription_creation_task.lock() { + if !subscription_creation_task.is_finished() { + return; // The task is still running + } } - // Remove any peers that are currently subscribed to us - if let Some(consensus_publisher) = &self.consensus_publisher { - for peer_network_id in consensus_publisher.get_active_subscribers() { - let _ = connected_peers_and_metadata.remove(&peer_network_id); + // Clone the shared state for the task + let active_observer_subscriptions = self.active_observer_subscriptions.clone(); + let consensus_observer_config = self.consensus_observer_config; + let consensus_observer_client = self.consensus_observer_client.clone(); + let consensus_publisher = self.consensus_publisher.clone(); + let db_reader = self.db_reader.clone(); + let time_service = self.time_service.clone(); + + // Otherwise, we should spawn a new subscription creation task + let subscription_creation_task = tokio::spawn(async move { + // Identify the terminated subscription peers + let terminated_subscription_peers = terminated_subscriptions + .iter() + .map(|(peer, _)| *peer) + .collect(); + + // Create the new subscriptions + let new_subscriptions = subscription_utils::create_new_subscriptions( + consensus_observer_config, + consensus_observer_client, + consensus_publisher, + db_reader, + time_service, + connected_peers_and_metadata, + num_subscriptions_to_create, + active_subscription_peers, + terminated_subscription_peers, + ) + .await; + + // Identify the new subscription peers + let new_subscription_peers = new_subscriptions + .iter() + .map(|subscription| subscription.get_peer_network_id()) + .collect::>(); + + // Add the new subscriptions to the list of active subscriptions + for subscription in new_subscriptions { + active_observer_subscriptions + .lock() + .insert(subscription.get_peer_network_id(), subscription); } - } - // Sort the peers by subscription optimality - let sorted_peers = - subscription::sort_peers_by_subscription_optimality(&connected_peers_and_metadata); + // Log a warning if we failed to create as many subscriptions as requested + let num_subscriptions_created = new_subscription_peers.len(); + if num_subscriptions_created < num_subscriptions_to_create { + warn!( + LogSchema::new(LogEntry::ConsensusObserver).message(&format!( + "Failed to create the requested number of subscriptions! Number of subscriptions \ + requested: {:?}, number of subscriptions created: {:?}.", + num_subscriptions_to_create, + num_subscriptions_created + )) + ); + } + + // Update the subscription change metrics + update_subscription_change_metrics(new_subscription_peers, terminated_subscriptions); + }); - // Return the sorted peers - Some(sorted_peers) + // Update the active subscription creation task + *self.active_subscription_creation_task.lock() = Some(subscription_creation_task); } /// Terminates any unhealthy subscriptions and returns the list of terminated subscriptions @@ -414,7 +306,9 @@ impl SubscriptionManager { /// Unsubscribes from the given peer by sending an unsubscribe request fn unsubscribe_from_peer(&mut self, peer_network_id: PeerNetworkId) { // Remove the peer from the active subscriptions - self.active_observer_subscriptions.remove(&peer_network_id); + self.active_observer_subscriptions + .lock() + .remove(&peer_network_id); // Send an unsubscribe request to the peer and process the response. // Note: we execute this asynchronously, as we don't need to wait for the response. @@ -463,65 +357,68 @@ impl SubscriptionManager { }); } - /// Updates the subscription creation and termination metrics - fn update_subscription_metrics( - &self, - new_subscription_peers: &[PeerNetworkId], - terminated_subscription_peers: Vec<(PeerNetworkId, Error)>, - ) { - // Update the created subscriptions metrics - for peer_network_id in new_subscription_peers { - metrics::increment_counter( - &metrics::OBSERVER_CREATED_SUBSCRIPTIONS, - metrics::CREATED_SUBSCRIPTION_LABEL, - peer_network_id, - ); - } - - // Update the terminated subscriptions metrics - for (peer_network_id, termination_reason) in terminated_subscription_peers { - metrics::increment_counter( - &metrics::OBSERVER_TERMINATED_SUBSCRIPTIONS, - termination_reason.get_label(), - &peer_network_id, - ); - } - - // Set the number of active subscriptions (grouped by network ID) - let active_subscription_peers = self.get_active_subscription_peers(); - for (network_id, active_subscription_peers) in &active_subscription_peers - .iter() - .chunk_by(|peer_network_id| peer_network_id.network_id()) - { - metrics::set_gauge( - &metrics::OBSERVER_NUM_ACTIVE_SUBSCRIPTIONS, - &network_id, - active_subscription_peers.collect::>().len() as i64, - ); - } - } - - /// Verifies that the message is from an active subscription. - /// If not, an error is returned. + /// Verifies that the message is from an active + /// subscription. If not, an error is returned. pub fn verify_message_for_subscription( &mut self, message_sender: PeerNetworkId, ) -> Result<(), Error> { - match self.active_observer_subscriptions.get_mut(&message_sender) { - Some(active_subscription) => { - // The message is from an active subscription (update the last message time) - active_subscription.update_last_message_receive_time(); - Ok(()) - }, - None => { - // The message is not from an active subscription (send another unsubscribe request) - self.unsubscribe_from_peer(message_sender); - Err(Error::InvalidMessageError(format!( - "Received message from unexpected peer, and not an active subscription: {}!", - message_sender - ))) - }, + // Check if the message is from an active subscription + if let Some(active_subscription) = self + .active_observer_subscriptions + .lock() + .get_mut(&message_sender) + { + // Update the last message receive time and return early + active_subscription.update_last_message_receive_time(); + return Ok(()); } + + // Otherwise, the message is not from an active subscription. + // Send another unsubscribe request, and return an error. + self.unsubscribe_from_peer(message_sender); + Err(Error::InvalidMessageError(format!( + "Received message from unexpected peer, and not an active subscription: {}!", + message_sender + ))) + } +} + +/// Updates the subscription creation and termination metrics +fn update_subscription_change_metrics( + new_subscription_peers: Vec, + terminated_subscription_peers: Vec<(PeerNetworkId, Error)>, +) { + // Update the created subscriptions metrics + for peer_network_id in new_subscription_peers { + metrics::increment_counter( + &metrics::OBSERVER_CREATED_SUBSCRIPTIONS, + metrics::CREATED_SUBSCRIPTION_LABEL, + &peer_network_id, + ); + } + + // Update the terminated subscriptions metrics + for (peer_network_id, termination_reason) in terminated_subscription_peers { + metrics::increment_counter( + &metrics::OBSERVER_TERMINATED_SUBSCRIPTIONS, + termination_reason.get_label(), + &peer_network_id, + ); + } +} + +/// Updates the total subscription metrics (grouped by network ID) +fn update_total_subscription_metrics(active_subscription_peers: &[PeerNetworkId]) { + for (network_id, active_subscription_peers) in &active_subscription_peers + .iter() + .chunk_by(|peer_network_id| peer_network_id.network_id()) + { + metrics::set_gauge( + &metrics::OBSERVER_NUM_ACTIVE_SUBSCRIPTIONS, + &network_id, + active_subscription_peers.collect::>().len() as i64, + ); } } @@ -800,92 +697,6 @@ mod test { .is_empty()); } - #[tokio::test] - async fn test_sort_peers_for_subscriptions() { - // Create a consensus observer client - let network_ids = &[NetworkId::Validator, NetworkId::Vfn, NetworkId::Public]; - let (peers_and_metadata, consensus_observer_client) = - create_consensus_observer_client(network_ids); - - // Create a new subscription manager - let consensus_observer_config = ConsensusObserverConfig::default(); - let db_reader = create_mock_db_reader(); - let mut subscription_manager = SubscriptionManager::new( - consensus_observer_client, - consensus_observer_config, - None, - db_reader.clone(), - TimeService::mock(), - ); - - // Sort the peers and verify that no peers are returned - let sorted_peers = sort_subscription_peers(&mut subscription_manager, vec![]); - assert!(sorted_peers.is_empty()); - - // Add a connected validator peer, VFN peer and public peer - for network_id in network_ids { - let distance_from_validators = match network_id { - NetworkId::Validator => 0, - NetworkId::Vfn => 1, - NetworkId::Public => 2, - }; - create_peer_and_connection( - *network_id, - peers_and_metadata.clone(), - distance_from_validators, - None, - true, - ); - } - - // Sort the peers and verify the ordering (according to distance) - let sorted_peers = sort_subscription_peers(&mut subscription_manager, vec![]); - assert_eq!(sorted_peers[0].network_id(), NetworkId::Validator); - assert_eq!(sorted_peers[1].network_id(), NetworkId::Vfn); - assert_eq!(sorted_peers[2].network_id(), NetworkId::Public); - assert_eq!(sorted_peers.len(), 3); - - // Sort the peers, but mark the validator as unhealthy (so it's ignored) - let sorted_peer_subset = - sort_subscription_peers(&mut subscription_manager, vec![sorted_peers[0]]); - assert_eq!(sorted_peer_subset[0].network_id(), NetworkId::Vfn); - assert_eq!(sorted_peer_subset[1].network_id(), NetworkId::Public); - assert_eq!(sorted_peer_subset.len(), 2); - - // Sort the peers, but mark the VFN and validator as unhealthy (so they're ignored) - let sorted_peer_subset = sort_subscription_peers(&mut subscription_manager, vec![ - sorted_peers[0], - sorted_peers[1], - ]); - assert_eq!(sorted_peer_subset[0].network_id(), NetworkId::Public); - assert_eq!(sorted_peer_subset.len(), 1); - - // Remove all the peers and verify that no peers are returned upon sorting - for peer_network_id in sorted_peers { - remove_peer_and_connection(peers_and_metadata.clone(), peer_network_id); - } - let sorted_peers = sort_subscription_peers(&mut subscription_manager, vec![]); - assert!(sorted_peers.is_empty()); - - // Add multiple validator peers, with different latencies - let mut validator_peers = vec![]; - for ping_latency_secs in [0.9, 0.8, 0.5, 0.1, 0.05] { - let validator_peer = create_peer_and_connection( - NetworkId::Validator, - peers_and_metadata.clone(), - 0, - Some(ping_latency_secs), - true, - ); - validator_peers.push(validator_peer); - } - - // Sort the peers and verify the ordering (according to latency) - let sorted_peers = sort_subscription_peers(&mut subscription_manager, vec![]); - let expected_peers = validator_peers.into_iter().rev().collect::>(); - assert_eq!(sorted_peers, expected_peers); - } - #[tokio::test] async fn test_terminate_unhealthy_subscriptions() { // Create a consensus observer client @@ -1226,6 +1037,7 @@ mod test { ); subscription_manager .active_observer_subscriptions + .lock() .insert(subscription_peer, observer_subscription); } @@ -1284,37 +1096,6 @@ mod test { peer_network_id } - /// Removes the peer and connection metadata for the given peer - fn remove_peer_and_connection( - peers_and_metadata: Arc, - peer_network_id: PeerNetworkId, - ) { - let peer_metadata = peers_and_metadata - .get_metadata_for_peer(peer_network_id) - .unwrap(); - let connection_id = peer_metadata.get_connection_metadata().connection_id; - peers_and_metadata - .remove_peer_metadata(peer_network_id, connection_id) - .unwrap(); - } - - /// A simple helper method that sorts the given peers for a subscription - fn sort_subscription_peers( - subscription_manager: &mut SubscriptionManager, - unhealthy_subscription_peers: Vec, - ) -> Vec { - // Get the connected peers and metadata - let connected_peers_and_metadata = subscription_manager.get_connected_peers_and_metadata(); - - // Sort the peers for subscription requests - subscription_manager - .sort_peers_for_subscriptions( - connected_peers_and_metadata, - unhealthy_subscription_peers, - ) - .unwrap() - } - /// A simple helper method that terminates any unhealthy subscriptions fn terminate_any_unhealthy_subscriptions( subscription_manager: &mut SubscriptionManager, diff --git a/consensus/src/consensus_observer/observer/subscription_utils.rs b/consensus/src/consensus_observer/observer/subscription_utils.rs new file mode 100644 index 0000000000000..7dd5ffa9b2ace --- /dev/null +++ b/consensus/src/consensus_observer/observer/subscription_utils.rs @@ -0,0 +1,823 @@ +// Copyright © Aptos Foundation +// SPDX-License-Identifier: Apache-2.0 + +use crate::consensus_observer::{ + common::logging::{LogEntry, LogSchema}, + network::{ + observer_client::ConsensusObserverClient, + observer_message::{ + ConsensusObserverMessage, ConsensusObserverRequest, ConsensusObserverResponse, + }, + }, + observer::subscription::ConsensusObserverSubscription, + publisher::consensus_publisher::ConsensusPublisher, +}; +use aptos_config::{config::ConsensusObserverConfig, network_id::PeerNetworkId}; +use aptos_logger::{error, info, warn}; +use aptos_network::{ + application::{interface::NetworkClient, metadata::PeerMetadata}, + ProtocolId, +}; +use aptos_storage_interface::DbReader; +use aptos_time_service::TimeService; +use ordered_float::OrderedFloat; +use std::{ + collections::{BTreeMap, HashMap}, + sync::Arc, +}; + +// A useful constant for representing the maximum ping latency +const MAX_PING_LATENCY_SECS: f64 = 10_000.0; + +/// Attempts to create the given number of new subscriptions +/// from the connected peers and metadata. Any active or unhealthy +/// subscriptions are excluded from the selection process. +pub async fn create_new_subscriptions( + consensus_observer_config: ConsensusObserverConfig, + consensus_observer_client: Arc< + ConsensusObserverClient>, + >, + consensus_publisher: Option>, + db_reader: Arc, + time_service: TimeService, + connected_peers_and_metadata: HashMap, + num_subscriptions_to_create: usize, + active_subscription_peers: Vec, + unhealthy_subscription_peers: Vec, +) -> Vec { + // Sort the potential peers for subscription requests + let mut sorted_potential_peers = match sort_peers_for_subscriptions( + connected_peers_and_metadata, + unhealthy_subscription_peers, + active_subscription_peers, + consensus_publisher, + ) { + Some(sorted_peers) => sorted_peers, + None => { + error!(LogSchema::new(LogEntry::ConsensusObserver) + .message("Failed to sort peers for subscription requests!")); + return vec![]; + }, + }; + + // Verify that we have potential peers to subscribe to + if sorted_potential_peers.is_empty() { + warn!(LogSchema::new(LogEntry::ConsensusObserver) + .message("There are no potential peers to subscribe to!")); + return vec![]; + } + + // Go through the potential peers and attempt to create new subscriptions + let mut created_subscriptions = vec![]; + for _ in 0..num_subscriptions_to_create { + // If there are no peers left to subscribe to, return early + if sorted_potential_peers.is_empty() { + info!( + LogSchema::new(LogEntry::ConsensusObserver).message(&format!( + "There are no more potential peers to subscribe to! \ + Num created subscriptions: {:?}", + created_subscriptions.len() + )) + ); + break; + } + + // Attempt to create a new subscription + let (observer_subscription, failed_subscription_peers) = create_single_subscription( + consensus_observer_config, + consensus_observer_client.clone(), + db_reader.clone(), + sorted_potential_peers.clone(), + time_service.clone(), + ) + .await; + + // Remove the failed peers from the sorted list + sorted_potential_peers.retain(|peer| !failed_subscription_peers.contains(peer)); + + // Process a successful subscription creation + if let Some(observer_subscription) = observer_subscription { + // Remove the peer from the sorted list (for the next selection) + sorted_potential_peers + .retain(|peer| *peer != observer_subscription.get_peer_network_id()); + + // Add the newly created subscription to the subscription list + created_subscriptions.push(observer_subscription); + } + } + + // Return the list of created subscriptions + created_subscriptions +} + +/// Attempts to create a new subscription to a single peer from the +/// sorted list of potential peers. If successful, the new subscription +/// is returned, alongside any peers with failed attempts. +async fn create_single_subscription( + consensus_observer_config: ConsensusObserverConfig, + consensus_observer_client: Arc< + ConsensusObserverClient>, + >, + db_reader: Arc, + sorted_potential_peers: Vec, + time_service: TimeService, +) -> (Option, Vec) { + let mut peers_with_failed_attempts = vec![]; + for potential_peer in sorted_potential_peers { + // Log the subscription attempt + info!( + LogSchema::new(LogEntry::ConsensusObserver).message(&format!( + "Attempting to subscribe to potential peer: {}!", + potential_peer + )) + ); + + // Send a subscription request to the peer and wait for the response + let subscription_request = ConsensusObserverRequest::Subscribe; + let request_timeout_ms = consensus_observer_config.network_request_timeout_ms; + let response = consensus_observer_client + .send_rpc_request_to_peer(&potential_peer, subscription_request, request_timeout_ms) + .await; + + // Process the response and update the active subscription + match response { + Ok(ConsensusObserverResponse::SubscribeAck) => { + // Log the successful subscription + info!( + LogSchema::new(LogEntry::ConsensusObserver).message(&format!( + "Successfully subscribed to peer: {}!", + potential_peer + )) + ); + + // Create the new subscription + let subscription = ConsensusObserverSubscription::new( + consensus_observer_config, + db_reader.clone(), + potential_peer, + time_service.clone(), + ); + + // Return the successful subscription + return (Some(subscription), peers_with_failed_attempts); + }, + Ok(response) => { + // We received an invalid response + warn!( + LogSchema::new(LogEntry::ConsensusObserver).message(&format!( + "Got unexpected response type for subscription request: {:?}", + response.get_label() + )) + ); + + // Add the peer to the list of failed attempts + peers_with_failed_attempts.push(potential_peer); + }, + Err(error) => { + // We encountered an error while sending the request + error!( + LogSchema::new(LogEntry::ConsensusObserver).message(&format!( + "Failed to send subscription request to peer: {}! Error: {:?}", + potential_peer, error + )) + ); + + // Add the peer to the list of failed attempts + peers_with_failed_attempts.push(potential_peer); + }, + } + } + + // We failed to create a new subscription + (None, peers_with_failed_attempts) +} + +/// Gets the distance from the validators for the specified peer from the peer metadata +fn get_distance_for_peer( + peer_network_id: &PeerNetworkId, + peer_metadata: &PeerMetadata, +) -> Option { + // Get the distance for the peer + let peer_monitoring_metadata = peer_metadata.get_peer_monitoring_metadata(); + let distance = peer_monitoring_metadata + .latest_network_info_response + .as_ref() + .map(|response| response.distance_from_validators); + + // If the distance is missing, log a warning + if distance.is_none() { + warn!( + LogSchema::new(LogEntry::ConsensusObserver).message(&format!( + "Unable to get distance for peer! Peer: {:?}", + peer_network_id + )) + ); + } + + distance +} + +/// Gets the latency for the specified peer from the peer metadata +fn get_latency_for_peer( + peer_network_id: &PeerNetworkId, + peer_metadata: &PeerMetadata, +) -> Option { + // Get the latency for the peer + let peer_monitoring_metadata = peer_metadata.get_peer_monitoring_metadata(); + let latency = peer_monitoring_metadata.average_ping_latency_secs; + + // If the latency is missing, log a warning + if latency.is_none() { + warn!( + LogSchema::new(LogEntry::ConsensusObserver).message(&format!( + "Unable to get latency for peer! Peer: {:?}", + peer_network_id + )) + ); + } + + latency +} + +/// Produces a list of sorted peers to service the subscription requests. +/// Any active or unhealthy subscriptions are excluded from the selection process. +/// Likewise, any peers currently subscribed to us are also excluded. +fn sort_peers_for_subscriptions( + mut connected_peers_and_metadata: HashMap, + active_subscription_peers: Vec, + unhealthy_subscription_peers: Vec, + consensus_publisher: Option>, +) -> Option> { + // Remove any peers we're already subscribed to + for active_subscription_peer in active_subscription_peers { + let _ = connected_peers_and_metadata.remove(&active_subscription_peer); + } + + // Remove any unhealthy subscription peers + for unhealthy_peer in unhealthy_subscription_peers { + let _ = connected_peers_and_metadata.remove(&unhealthy_peer); + } + + // Remove any peers that are currently subscribed to us + if let Some(consensus_publisher) = consensus_publisher { + for peer_network_id in consensus_publisher.get_active_subscribers() { + let _ = connected_peers_and_metadata.remove(&peer_network_id); + } + } + + // Sort the peers by subscription optimality + let sorted_peers = sort_peers_by_subscription_optimality(&connected_peers_and_metadata); + + // Return the sorted peers + Some(sorted_peers) +} + +/// Sorts the peers by subscription optimality (in descending order of +/// optimality). This requires: (i) sorting the peers by distance from the +/// validator set and ping latency (lower values are more optimal); and (ii) +/// filtering out peers that don't support consensus observer. +/// +/// Note: we prioritize distance over latency as we want to avoid close +/// but not up-to-date peers. If peers don't have sufficient metadata +/// for sorting, they are given a lower priority. +pub fn sort_peers_by_subscription_optimality( + peers_and_metadata: &HashMap, +) -> Vec { + // Group peers and latencies by validator distance, i.e., distance -> [(peer, latency)] + let mut unsupported_peers = Vec::new(); + let mut peers_and_latencies_by_distance = BTreeMap::new(); + for (peer_network_id, peer_metadata) in peers_and_metadata { + // Verify that the peer supports consensus observer + if !supports_consensus_observer(peer_metadata) { + unsupported_peers.push(*peer_network_id); + continue; // Skip the peer + } + + // Get the distance and latency for the peer + let distance = get_distance_for_peer(peer_network_id, peer_metadata); + let latency = get_latency_for_peer(peer_network_id, peer_metadata); + + // If the distance is not found, use the maximum distance + let distance = + distance.unwrap_or(aptos_peer_monitoring_service_types::MAX_DISTANCE_FROM_VALIDATORS); + + // If the latency is not found, use a large latency + let latency = latency.unwrap_or(MAX_PING_LATENCY_SECS); + + // Add the peer and latency to the distance group + peers_and_latencies_by_distance + .entry(distance) + .or_insert_with(Vec::new) + .push((*peer_network_id, OrderedFloat(latency))); + } + + // If there are peers that don't support consensus observer, log them + if !unsupported_peers.is_empty() { + info!( + LogSchema::new(LogEntry::ConsensusObserver).message(&format!( + "Found {} peers that don't support consensus observer! Peers: {:?}", + unsupported_peers.len(), + unsupported_peers + )) + ); + } + + // Sort the peers by distance and latency. Note: BTreeMaps are + // sorted by key, so the entries will be sorted by distance in ascending order. + let mut sorted_peers = Vec::new(); + for (_, mut peers_and_latencies) in peers_and_latencies_by_distance { + // Sort the peers by latency + peers_and_latencies.sort_by_key(|(_, latency)| *latency); + + // Add the peers to the sorted list (in sorted order) + sorted_peers.extend( + peers_and_latencies + .into_iter() + .map(|(peer_network_id, _)| peer_network_id), + ); + } + + // Log the sorted peers + info!( + LogSchema::new(LogEntry::ConsensusObserver).message(&format!( + "Sorted {} peers by subscription optimality! Peers: {:?}", + sorted_peers.len(), + sorted_peers + )) + ); + + sorted_peers +} + +/// Returns true iff the peer metadata indicates support for consensus observer +fn supports_consensus_observer(peer_metadata: &PeerMetadata) -> bool { + peer_metadata.supports_protocol(ProtocolId::ConsensusObserver) + && peer_metadata.supports_protocol(ProtocolId::ConsensusObserverRpc) +} + +#[cfg(test)] +mod tests { + use super::*; + use aptos_config::{config::PeerRole, network_id::NetworkId}; + use aptos_netcore::transport::ConnectionOrigin; + use aptos_network::{ + application::storage::PeersAndMetadata, + protocols::wire::handshake::v1::{MessagingProtocolVersion, ProtocolIdSet}, + transport::{ConnectionId, ConnectionMetadata}, + }; + use aptos_peer_monitoring_service_types::{ + response::NetworkInformationResponse, PeerMonitoringMetadata, + }; + use aptos_types::{network_address::NetworkAddress, PeerId}; + use maplit::hashmap; + use std::collections::HashSet; + + #[test] + fn test_sort_peers_by_distance_and_latency() { + // Sort an empty list of peers + let peers_and_metadata = HashMap::new(); + assert!(sort_peers_by_subscription_optimality(&peers_and_metadata).is_empty()); + + // Create a list of peers with empty metadata + let peers_and_metadata = create_peers_and_metadata(true, true, true, 10); + + // Sort the peers and verify the results + let sorted_peers = sort_peers_by_subscription_optimality(&peers_and_metadata); + assert_eq!(sorted_peers.len(), 10); + + // Create a list of peers with valid metadata + let peers_and_metadata = create_peers_and_metadata(false, false, true, 10); + + // Sort the peers + let sorted_peers = sort_peers_by_subscription_optimality(&peers_and_metadata); + + // Verify the order of the peers + verify_increasing_distance_latencies(&peers_and_metadata, &sorted_peers); + assert_eq!(sorted_peers.len(), 10); + + // Create a list of peers with and without metadata + let mut peers_and_metadata = create_peers_and_metadata(false, false, true, 10); + peers_and_metadata.extend(create_peers_and_metadata(true, false, true, 10)); + peers_and_metadata.extend(create_peers_and_metadata(false, true, true, 10)); + peers_and_metadata.extend(create_peers_and_metadata(true, true, true, 10)); + + // Sort the peers + let sorted_peers = sort_peers_by_subscription_optimality(&peers_and_metadata); + assert_eq!(sorted_peers.len(), 40); + + // Verify the order of the first 20 peers + let (first_20_peers, sorted_peers) = sorted_peers.split_at(20); + verify_increasing_distance_latencies(&peers_and_metadata, first_20_peers); + + // Verify that the next 10 peers only have latency metadata + let (next_10_peers, sorted_peers) = sorted_peers.split_at(10); + for sorted_peer in next_10_peers { + let peer_metadata = peers_and_metadata.get(sorted_peer).unwrap(); + assert!(get_distance_for_peer(sorted_peer, peer_metadata).is_none()); + assert!(get_latency_for_peer(sorted_peer, peer_metadata).is_some()); + } + + // Verify that the last 10 peers have no metadata + let (last_10_peers, remaining_peers) = sorted_peers.split_at(10); + for sorted_peer in last_10_peers { + let peer_metadata = peers_and_metadata.get(sorted_peer).unwrap(); + assert!(get_distance_for_peer(sorted_peer, peer_metadata).is_none()); + assert!(get_latency_for_peer(sorted_peer, peer_metadata).is_none()); + } + assert!(remaining_peers.is_empty()); + } + + #[test] + fn test_sort_peers_by_distance_and_latency_filter() { + // Sort an empty list of peers + let peers_and_metadata = HashMap::new(); + assert!(sort_peers_by_subscription_optimality(&peers_and_metadata).is_empty()); + + // Create a list of peers with empty metadata (with consensus observer support) + let peers_and_metadata = create_peers_and_metadata(true, true, true, 10); + + // Sort the peers and verify the results + let sorted_peers = sort_peers_by_subscription_optimality(&peers_and_metadata); + assert_eq!(sorted_peers.len(), 10); + + // Create a list of peers with empty metadata (without consensus observer support) + let peers_and_metadata = create_peers_and_metadata(true, true, false, 10); + + // Sort the peers and verify the results + let sorted_peers = sort_peers_by_subscription_optimality(&peers_and_metadata); + assert!(sorted_peers.is_empty()); + + // Create a list of peers with valid metadata (without consensus observer support) + let peers_and_metadata = create_peers_and_metadata(false, false, false, 10); + + // Sort the peers and verify the results + let sorted_peers = sort_peers_by_subscription_optimality(&peers_and_metadata); + assert!(sorted_peers.is_empty()); + + // Create a list of peers with empty metadata (with and without consensus observer support) + let mut peers_and_metadata = create_peers_and_metadata(true, true, true, 5); + peers_and_metadata.extend(create_peers_and_metadata(true, true, false, 50)); + + // Sort the peers and verify the results (only the supported peers are sorted) + let sorted_peers = sort_peers_by_subscription_optimality(&peers_and_metadata); + assert_eq!(sorted_peers.len(), 5); + + // Create a list of peers with valid metadata (with and without consensus observer support) + let mut peers_and_metadata = create_peers_and_metadata(false, false, true, 50); + peers_and_metadata.extend(create_peers_and_metadata(false, false, false, 10)); + + // Sort the peers and verify the results (only the supported peers are sorted) + let sorted_peers = sort_peers_by_subscription_optimality(&peers_and_metadata); + assert_eq!(sorted_peers.len(), 50); + + // Create a list of peers with valid metadata (with and without consensus observer support) + let supported_peer_and_metadata = create_peers_and_metadata(false, false, true, 1); + let unsupported_peer_and_metadata = create_peers_and_metadata(false, false, false, 1); + let mut peers_and_metadata = HashMap::new(); + peers_and_metadata.extend(supported_peer_and_metadata.clone()); + peers_and_metadata.extend(unsupported_peer_and_metadata); + + // Sort the peers and verify the results (only the supported peer is sorted) + let supported_peer = supported_peer_and_metadata.keys().next().unwrap(); + let sorted_peers = sort_peers_by_subscription_optimality(&peers_and_metadata); + assert_eq!(sorted_peers, vec![*supported_peer]); + } + + #[tokio::test] + async fn test_sort_peers_for_subscriptions() { + // Create a consensus observer client + let network_ids = &[NetworkId::Validator, NetworkId::Vfn, NetworkId::Public]; + let (peers_and_metadata, consensus_observer_client) = + create_consensus_observer_client(network_ids); + + // Create a consensus publisher + let consensus_observer_config = ConsensusObserverConfig::default(); + let (consensus_publisher, _) = + ConsensusPublisher::new(consensus_observer_config, consensus_observer_client.clone()); + let consensus_publisher = Arc::new(consensus_publisher); + + // Sort the peers and verify that no peers are returned + let sorted_peers = sort_subscription_peers( + consensus_publisher.clone(), + peers_and_metadata.clone(), + vec![], + vec![], + ); + assert!(sorted_peers.is_empty()); + + // Add a connected validator peer, VFN peer and public peer + for network_id in network_ids { + let distance_from_validators = match network_id { + NetworkId::Validator => 0, + NetworkId::Vfn => 1, + NetworkId::Public => 2, + }; + create_peer_and_connection( + *network_id, + peers_and_metadata.clone(), + distance_from_validators, + None, + true, + ); + } + + // Sort the peers and verify the ordering (according to distance) + let sorted_peers = sort_subscription_peers( + consensus_publisher.clone(), + peers_and_metadata.clone(), + vec![], + vec![], + ); + assert_eq!(sorted_peers[0].network_id(), NetworkId::Validator); + assert_eq!(sorted_peers[1].network_id(), NetworkId::Vfn); + assert_eq!(sorted_peers[2].network_id(), NetworkId::Public); + assert_eq!(sorted_peers.len(), 3); + + // Sort the peers, but mark the validator as unhealthy (so it's ignored) + let sorted_peer_subset = sort_subscription_peers( + consensus_publisher.clone(), + peers_and_metadata.clone(), + vec![], + vec![sorted_peers[0]], + ); + assert_eq!(sorted_peer_subset[0].network_id(), NetworkId::Vfn); + assert_eq!(sorted_peer_subset[1].network_id(), NetworkId::Public); + assert_eq!(sorted_peer_subset.len(), 2); + + // Sort the peers, but mark the VFN and validator as active subscriptions (so they're ignored) + let sorted_peer_subset = sort_subscription_peers( + consensus_publisher.clone(), + peers_and_metadata.clone(), + vec![sorted_peers[0], sorted_peers[1]], + vec![], + ); + assert_eq!(sorted_peer_subset[0].network_id(), NetworkId::Public); + assert_eq!(sorted_peer_subset.len(), 1); + + // Create a consensus publisher with the PFN as an active subscriber + let consensus_publisher_with_subscribers = + Arc::new(ConsensusPublisher::new_with_active_subscribers( + consensus_observer_config, + consensus_observer_client.clone(), + HashSet::from_iter(vec![sorted_peers[2]]), + )); + + // Sort the peers, and verify the PFN is ignored (since it's an active subscriber) + let sorted_peer_subset = sort_subscription_peers( + consensus_publisher_with_subscribers, + peers_and_metadata.clone(), + vec![], + vec![], + ); + assert_eq!(sorted_peer_subset[0].network_id(), NetworkId::Validator); + assert_eq!(sorted_peer_subset[1].network_id(), NetworkId::Vfn); + assert_eq!(sorted_peer_subset.len(), 2); + + // Remove all the peers and verify that no peers are returned upon sorting + for peer_network_id in sorted_peers { + remove_peer_and_connection(peers_and_metadata.clone(), peer_network_id); + } + let sorted_peers = sort_subscription_peers( + consensus_publisher.clone(), + peers_and_metadata.clone(), + vec![], + vec![], + ); + assert!(sorted_peers.is_empty()); + + // Add multiple validator peers, with different latencies + let mut validator_peers = vec![]; + for ping_latency_secs in [0.9, 0.8, 0.5, 0.1, 0.05] { + let validator_peer = create_peer_and_connection( + NetworkId::Validator, + peers_and_metadata.clone(), + 0, + Some(ping_latency_secs), + true, + ); + validator_peers.push(validator_peer); + } + + // Sort the peers and verify the ordering (according to latency) + let sorted_peers = sort_subscription_peers( + consensus_publisher, + peers_and_metadata.clone(), + vec![], + vec![], + ); + let expected_peers = validator_peers.into_iter().rev().collect::>(); + assert_eq!(sorted_peers, expected_peers); + } + + /// Creates a new connection metadata for testing + fn create_connection_metadata( + peer_network_id: PeerNetworkId, + support_consensus_observer: bool, + ) -> ConnectionMetadata { + if support_consensus_observer { + // Create a protocol set that supports consensus observer + let protocol_set = ProtocolIdSet::from_iter(vec![ + ProtocolId::ConsensusObserver, + ProtocolId::ConsensusObserverRpc, + ]); + + // Create the connection metadata with the protocol set + ConnectionMetadata::new( + peer_network_id.peer_id(), + ConnectionId::default(), + NetworkAddress::mock(), + ConnectionOrigin::Inbound, + MessagingProtocolVersion::V1, + protocol_set, + PeerRole::PreferredUpstream, + ) + } else { + ConnectionMetadata::mock(peer_network_id.peer_id()) + } + } + + /// Creates a new consensus observer client and a peers and metadata container + fn create_consensus_observer_client( + network_ids: &[NetworkId], + ) -> ( + Arc, + Arc>>, + ) { + let peers_and_metadata = PeersAndMetadata::new(network_ids); + let network_client = + NetworkClient::new(vec![], vec![], hashmap![], peers_and_metadata.clone()); + let consensus_observer_client = Arc::new(ConsensusObserverClient::new(network_client)); + + (peers_and_metadata, consensus_observer_client) + } + + /// Creates a new peer with the specified connection metadata + fn create_peer_and_connection( + network_id: NetworkId, + peers_and_metadata: Arc, + distance_from_validators: u64, + ping_latency_secs: Option, + support_consensus_observer: bool, + ) -> PeerNetworkId { + // Create the connection metadata + let peer_network_id = PeerNetworkId::new(network_id, PeerId::random()); + let connection_metadata = if support_consensus_observer { + // Create a protocol set that supports consensus observer + let protocol_set = ProtocolIdSet::from_iter(vec![ + ProtocolId::ConsensusObserver, + ProtocolId::ConsensusObserverRpc, + ]); + + // Create the connection metadata with the protocol set + ConnectionMetadata::new( + peer_network_id.peer_id(), + ConnectionId::default(), + NetworkAddress::mock(), + ConnectionOrigin::Inbound, + MessagingProtocolVersion::V1, + protocol_set, + PeerRole::PreferredUpstream, + ) + } else { + ConnectionMetadata::mock(peer_network_id.peer_id()) + }; + + // Insert the connection into peers and metadata + peers_and_metadata + .insert_connection_metadata(peer_network_id, connection_metadata.clone()) + .unwrap(); + + // Update the peer monitoring metadata + let latest_network_info_response = NetworkInformationResponse { + connected_peers: BTreeMap::new(), + distance_from_validators, + }; + let monitoring_metdata = PeerMonitoringMetadata::new( + ping_latency_secs, + ping_latency_secs, + Some(latest_network_info_response), + None, + None, + ); + peers_and_metadata + .update_peer_monitoring_metadata(peer_network_id, monitoring_metdata.clone()) + .unwrap(); + + peer_network_id + } + + /// Creates a new peer and metadata for testing + fn create_peer_and_metadata( + latency: Option, + distance_from_validators: Option, + support_consensus_observer: bool, + ) -> (PeerNetworkId, PeerMetadata) { + // Create a random peer + let peer_network_id = PeerNetworkId::random(); + + // Create a new peer metadata with the given latency and distance + let connection_metadata = + create_connection_metadata(peer_network_id, support_consensus_observer); + let network_information_response = + distance_from_validators.map(|distance| NetworkInformationResponse { + connected_peers: BTreeMap::new(), + distance_from_validators: distance, + }); + let peer_monitoring_metadata = + PeerMonitoringMetadata::new(latency, None, network_information_response, None, None); + let peer_metadata = + PeerMetadata::new_for_test(connection_metadata, peer_monitoring_metadata); + + (peer_network_id, peer_metadata) + } + + /// Creates a list of peers and metadata for testing + fn create_peers_and_metadata( + empty_latency: bool, + empty_distance: bool, + support_consensus_observer: bool, + num_peers: u64, + ) -> HashMap { + let mut peers_and_metadata = HashMap::new(); + for i in 1..num_peers + 1 { + // Determine the distance for the peer + let distance = if empty_distance { None } else { Some(i) }; + + // Determine the latency for the peer + let latency = if empty_latency { None } else { Some(i as f64) }; + + // Create a new peer and metadata + let (peer_network_id, peer_metadata) = + create_peer_and_metadata(latency, distance, support_consensus_observer); + peers_and_metadata.insert(peer_network_id, peer_metadata); + } + peers_and_metadata + } + + /// Removes the peer and connection metadata for the given peer + fn remove_peer_and_connection( + peers_and_metadata: Arc, + peer_network_id: PeerNetworkId, + ) { + let peer_metadata = peers_and_metadata + .get_metadata_for_peer(peer_network_id) + .unwrap(); + let connection_id = peer_metadata.get_connection_metadata().connection_id; + peers_and_metadata + .remove_peer_metadata(peer_network_id, connection_id) + .unwrap(); + } + + /// A simple helper method that sorts the given peers for a subscription + fn sort_subscription_peers( + consensus_publisher: Arc, + peers_and_metadata: Arc, + active_subscription_peers: Vec, + unhealthy_subscription_peers: Vec, + ) -> Vec { + // Get the connected peers and metadata + let connected_peers_and_metadata = peers_and_metadata + .get_connected_peers_and_metadata() + .unwrap(); + + // Sort the peers for subscription requests + sort_peers_for_subscriptions( + connected_peers_and_metadata, + unhealthy_subscription_peers, + active_subscription_peers, + Some(consensus_publisher), + ) + .unwrap() + } + + /// Verifies that the distance and latencies for the peers are in + /// increasing order (with the distance taking precedence over the latency). + fn verify_increasing_distance_latencies( + peers_and_metadata: &HashMap, + sorted_peers: &[PeerNetworkId], + ) { + let mut previous_latency = None; + let mut previous_distance = 0; + for sorted_peer in sorted_peers { + // Get the distance and latency for the peer + let peer_metadata = peers_and_metadata.get(sorted_peer).unwrap(); + let distance = get_distance_for_peer(sorted_peer, peer_metadata).unwrap(); + let latency = get_latency_for_peer(sorted_peer, peer_metadata); + + // Verify the order of the peers + if distance == previous_distance { + if let Some(latency) = latency { + if let Some(previous_latency) = previous_latency { + assert!(latency >= previous_latency); + } + } + } else { + assert!(distance > previous_distance); + } + + // Update the previous latency and distance + previous_latency = latency; + previous_distance = distance; + } + } +} diff --git a/consensus/src/consensus_observer/publisher/consensus_publisher.rs b/consensus/src/consensus_observer/publisher/consensus_publisher.rs index 1379c87131cc5..899901593f7ed 100644 --- a/consensus/src/consensus_observer/publisher/consensus_publisher.rs +++ b/consensus/src/consensus_observer/publisher/consensus_publisher.rs @@ -70,6 +70,26 @@ impl ConsensusPublisher { (consensus_publisher, outbound_message_receiver) } + #[cfg(test)] + /// Creates a new consensus publisher with the given active subscribers + pub fn new_with_active_subscribers( + consensus_observer_config: ConsensusObserverConfig, + consensus_observer_client: Arc< + ConsensusObserverClient>, + >, + active_subscribers: HashSet, + ) -> Self { + // Create the consensus publisher + let (consensus_publisher, _) = + ConsensusPublisher::new(consensus_observer_config, consensus_observer_client); + + // Update the active subscribers + *consensus_publisher.active_subscribers.write() = active_subscribers; + + // Return the publisher + consensus_publisher + } + /// Adds the given subscriber to the set of active subscribers fn add_active_subscriber(&self, peer_network_id: PeerNetworkId) { self.active_subscribers.write().insert(peer_network_id); From 9f9670a93b79ca1257fcf14042bef52a8a5e2907 Mon Sep 17 00:00:00 2001 From: Josh Lind Date: Thu, 12 Sep 2024 09:05:10 -0400 Subject: [PATCH 2/4] [Consensus Observer] Move subscription health check to subscription.rs --- .../observer/subscription.rs | 208 +++++++++++++++++- .../observer/subscription_manager.rs | 23 +- 2 files changed, 206 insertions(+), 25 deletions(-) diff --git a/consensus/src/consensus_observer/observer/subscription.rs b/consensus/src/consensus_observer/observer/subscription.rs index 7b368fe3417c6..5d9ae4d43def1 100644 --- a/consensus/src/consensus_observer/observer/subscription.rs +++ b/consensus/src/consensus_observer/observer/subscription.rs @@ -58,10 +58,38 @@ impl ConsensusObserverSubscription { } } + /// Checks if the subscription is still healthy. If not, an error + /// is returned indicating the reason for the subscription failure. + pub fn check_subscription_health( + &mut self, + connected_peers_and_metadata: &HashMap, + ) -> Result<(), Error> { + // Verify the subscription peer is still connected + let peer_network_id = self.get_peer_network_id(); + if !connected_peers_and_metadata.contains_key(&peer_network_id) { + return Err(Error::SubscriptionDisconnected(format!( + "The peer: {:?} is no longer connected!", + peer_network_id + ))); + } + + // Verify the subscription has not timed out + self.check_subscription_timeout()?; + + // Verify that the DB is continuing to sync and commit new data + self.check_syncing_progress()?; + + // Verify that the subscription peer is still optimal + self.check_subscription_peer_optimality(connected_peers_and_metadata)?; + + // The subscription seems healthy + Ok(()) + } + /// Verifies that the peer currently selected for the subscription is /// optimal. This is only done if: (i) the peers have changed since the /// last check; or (ii) enough time has elapsed to force a refresh. - pub fn check_subscription_peer_optimality( + fn check_subscription_peer_optimality( &mut self, peers_and_metadata: &HashMap, ) -> Result<(), Error> { @@ -120,7 +148,7 @@ impl ConsensusObserverSubscription { /// Verifies that the subscription has not timed out based /// on the last received message time. - pub fn check_subscription_timeout(&self) -> Result<(), Error> { + fn check_subscription_timeout(&self) -> Result<(), Error> { // Calculate the duration since the last message let time_now = self.time_service.now(); let duration_since_last_message = time_now.duration_since(self.last_message_receive_time); @@ -139,7 +167,7 @@ impl ConsensusObserverSubscription { } /// Verifies that the DB is continuing to sync and commit new data - pub fn check_syncing_progress(&mut self) -> Result<(), Error> { + fn check_syncing_progress(&mut self) -> Result<(), Error> { // Get the current synced version from storage let current_synced_version = self.db_reader @@ -212,6 +240,161 @@ mod test { } } + #[test] + fn test_check_subscription_health_connected_and_timeout() { + // Create a consensus observer config + let consensus_observer_config = ConsensusObserverConfig { + max_synced_version_timeout_ms: 100_000_000, // Use a large value so that we don't get DB progress errors + ..ConsensusObserverConfig::default() + }; + + // Create a new observer subscription + let time_service = TimeService::mock(); + let peer_network_id = PeerNetworkId::random(); + let mut subscription = ConsensusObserverSubscription::new( + consensus_observer_config, + Arc::new(MockDatabaseReader::new()), + peer_network_id, + time_service.clone(), + ); + + // Verify that the subscription is unhealthy (the peer is not connected) + assert_matches!( + subscription.check_subscription_health(&HashMap::new()), + Err(Error::SubscriptionDisconnected(_)) + ); + + // Create a peers and metadata map for the subscription + let mut peers_and_metadata = HashMap::new(); + add_metadata_for_peer(&mut peers_and_metadata, peer_network_id, true, false); + + // Elapse enough time to timeout the subscription + let mock_time_service = time_service.into_mock(); + mock_time_service.advance(Duration::from_millis( + consensus_observer_config.max_subscription_timeout_ms + 1, + )); + + // Verify that the subscription has timed out + assert_matches!( + subscription.check_subscription_health(&peers_and_metadata), + Err(Error::SubscriptionTimeout(_)) + ); + } + + #[test] + fn test_check_subscription_health_progress() { + // Create a consensus observer config with a large timeout + let consensus_observer_config = ConsensusObserverConfig { + max_subscription_timeout_ms: 100_000_000, // Use a large value so that we don't time out + ..ConsensusObserverConfig::default() + }; + + // Create a mock DB reader with expectations + let first_synced_version = 1; + let second_synced_version = 2; + let mut mock_db_reader = MockDatabaseReader::new(); + mock_db_reader + .expect_get_latest_ledger_info_version() + .returning(move || Ok(first_synced_version)) + .times(1); // Only allow one call for the first version + mock_db_reader + .expect_get_latest_ledger_info_version() + .returning(move || Ok(second_synced_version)); // Allow multiple calls for the second version + + // Create a new observer subscription + let peer_network_id = PeerNetworkId::random(); + let time_service = TimeService::mock(); + let mut subscription = ConsensusObserverSubscription::new( + consensus_observer_config, + Arc::new(mock_db_reader), + peer_network_id, + time_service.clone(), + ); + + // Verify that the DB is making sync progress and that the highest synced version is updated + let mock_time_service = time_service.into_mock(); + verify_subscription_syncing_progress( + &mut subscription, + first_synced_version, + mock_time_service.now(), + ); + + // Elapse enough time to timeout the subscription + mock_time_service.advance(Duration::from_millis( + consensus_observer_config.max_synced_version_timeout_ms + 1, + )); + + // Verify that the DB is still making sync progress (the next version is higher) + verify_subscription_syncing_progress( + &mut subscription, + second_synced_version, + mock_time_service.now(), + ); + + // Elapse enough time to timeout the subscription + mock_time_service.advance(Duration::from_millis( + consensus_observer_config.max_synced_version_timeout_ms + 1, + )); + + // Verify that the DB is not making sync progress and that the subscription has timed out + assert_matches!( + subscription.check_syncing_progress(), + Err(Error::SubscriptionProgressStopped(_)) + ); + } + + #[test] + fn test_check_subscription_health_optimality() { + // Create a consensus observer config with a single subscription and large timeouts + let consensus_observer_config = ConsensusObserverConfig { + max_concurrent_subscriptions: 1, + max_subscription_timeout_ms: 100_000_000, // Use a large value so that we don't time out + max_synced_version_timeout_ms: 100_000_000, // Use a large value so that we don't get DB progress errors + ..ConsensusObserverConfig::default() + }; + + // Create a mock DB reader with expectations + let mut mock_db_reader = MockDatabaseReader::new(); + mock_db_reader + .expect_get_latest_ledger_info_version() + .returning(move || Ok(1)); + + // Create a new observer subscription + let time_service = TimeService::mock(); + let peer_network_id = PeerNetworkId::random(); + let mut subscription = ConsensusObserverSubscription::new( + consensus_observer_config, + Arc::new(mock_db_reader), + peer_network_id, + time_service.clone(), + ); + + // Create a peers and metadata map for the subscription + let mut peers_and_metadata = HashMap::new(); + add_metadata_for_peer(&mut peers_and_metadata, peer_network_id, true, false); + + // Verify that the subscription is healthy + assert!(subscription + .check_subscription_health(&peers_and_metadata) + .is_ok()); + + // Add a more optimal peer to the set of peers + let new_optimal_peer = PeerNetworkId::random(); + add_metadata_for_peer(&mut peers_and_metadata, new_optimal_peer, true, true); + + // Elapse enough time for a peer optimality check + let mock_time_service = time_service.into_mock(); + mock_time_service.advance(Duration::from_millis( + consensus_observer_config.subscription_peer_change_interval_ms + 1, + )); + + // Verify that the subscription is no longer optimal + assert_matches!( + subscription.check_subscription_health(&peers_and_metadata), + Err(Error::SubscriptionSuboptimal(_)) + ); + } + #[test] fn test_check_subscription_peer_optimality_single() { // Create a consensus observer config with a maximum of 1 subscription @@ -344,7 +527,7 @@ mod test { } #[test] - fn test_check_subscription_peer_refresh() { + fn test_check_subscription_peer_optimality_refresh() { // Create a consensus observer config with a maximum of 1 subscription let consensus_observer_config = create_observer_config(1); @@ -574,6 +757,23 @@ mod test { ); } + #[test] + fn test_get_peer_network_id() { + // Create a new observer subscription + let consensus_observer_config = ConsensusObserverConfig::default(); + let peer_network_id = PeerNetworkId::random(); + let time_service = TimeService::mock(); + let subscription = ConsensusObserverSubscription::new( + consensus_observer_config, + Arc::new(MockDatabaseReader::new()), + peer_network_id, + time_service.clone(), + ); + + // Verify that the peer network id matches the expected value + assert_eq!(subscription.get_peer_network_id(), peer_network_id); + } + #[test] fn test_update_last_message_receive_time() { // Create a new observer subscription diff --git a/consensus/src/consensus_observer/observer/subscription_manager.rs b/consensus/src/consensus_observer/observer/subscription_manager.rs index 16cd756c176dd..deba8d52a98e6 100644 --- a/consensus/src/consensus_observer/observer/subscription_manager.rs +++ b/consensus/src/consensus_observer/observer/subscription_manager.rs @@ -87,26 +87,7 @@ impl SubscriptionManager { // Check the health of the subscription match active_subscription { Some(active_subscription) => { - // Verify the peer is still connected - if !connected_peers_and_metadata.contains_key(&peer_network_id) { - return Err(Error::SubscriptionDisconnected(format!( - "The peer: {:?} is no longer connected!", - peer_network_id - ))); - } - - // Verify the subscription has not timed out - active_subscription.check_subscription_timeout()?; - - // Verify that the DB is continuing to sync and commit new data - active_subscription.check_syncing_progress()?; - - // Verify that the subscription peer is still optimal - active_subscription - .check_subscription_peer_optimality(connected_peers_and_metadata)?; - - // The subscription seems healthy - Ok(()) + active_subscription.check_subscription_health(connected_peers_and_metadata) }, None => Err(Error::UnexpectedError(format!( "The subscription to peer: {:?} is not active!", @@ -217,7 +198,7 @@ impl SubscriptionManager { let db_reader = self.db_reader.clone(); let time_service = self.time_service.clone(); - // Otherwise, we should spawn a new subscription creation task + // Spawn a new subscription creation task let subscription_creation_task = tokio::spawn(async move { // Identify the terminated subscription peers let terminated_subscription_peers = terminated_subscriptions From a7569049c906150041792fe9ebf27570e672b752 Mon Sep 17 00:00:00 2001 From: Josh Lind Date: Thu, 12 Sep 2024 11:44:48 -0400 Subject: [PATCH 3/4] [Consensus Observer] Improve subscription manager unit tests. --- .../observer/subscription_manager.rs | 337 ++++++++++++++---- 1 file changed, 260 insertions(+), 77 deletions(-) diff --git a/consensus/src/consensus_observer/observer/subscription_manager.rs b/consensus/src/consensus_observer/observer/subscription_manager.rs index deba8d52a98e6..2d89163e1ae86 100644 --- a/consensus/src/consensus_observer/observer/subscription_manager.rs +++ b/consensus/src/consensus_observer/observer/subscription_manager.rs @@ -430,6 +430,95 @@ mod test { } } + #[tokio::test] + async fn test_check_and_manage_subscriptions() { + // Create a consensus observer client + let network_id = NetworkId::Public; + let (peers_and_metadata, consensus_observer_client) = + create_consensus_observer_client(&[network_id]); + + // Create a new subscription manager + let consensus_observer_config = ConsensusObserverConfig::default(); + let db_reader = create_mock_db_reader(); + let time_service = TimeService::mock(); + let mut subscription_manager = SubscriptionManager::new( + consensus_observer_client, + consensus_observer_config, + None, + db_reader.clone(), + time_service.clone(), + ); + + // Verify that no subscriptions are active + verify_active_subscription_peers(&subscription_manager, vec![]); + + // Check and manage the subscriptions + let result = subscription_manager.check_and_manage_subscriptions().await; + + // Verify that no subscriptions were terminated + assert!(result.is_ok()); + verify_active_subscription_peers(&subscription_manager, vec![]); + + // Add a new connected peer and subscription + let connected_peer_1 = + create_peer_and_connection(network_id, peers_and_metadata.clone(), 1, None, true); + create_observer_subscription( + &mut subscription_manager, + consensus_observer_config, + db_reader.clone(), + connected_peer_1, + time_service.clone(), + ); + + // Add another connected peer and subscription + let connected_peer_2 = + create_peer_and_connection(network_id, peers_and_metadata.clone(), 2, None, true); + create_observer_subscription( + &mut subscription_manager, + consensus_observer_config, + db_reader.clone(), + connected_peer_2, + TimeService::mock(), // Use a different time service (to avoid timeouts!) + ); + + // Check and manage the subscriptions + subscription_manager + .check_and_manage_subscriptions() + .await + .unwrap(); + + // Verify that the subscriptions are still active + verify_active_subscription_peers(&subscription_manager, vec![ + connected_peer_1, + connected_peer_2, + ]); + + // Elapse time to simulate a timeout for peer 1 + let mock_time_service = time_service.into_mock(); + mock_time_service.advance(Duration::from_millis( + consensus_observer_config.max_subscription_timeout_ms + 1, + )); + + // Check and manage the subscriptions + subscription_manager + .check_and_manage_subscriptions() + .await + .unwrap(); + + // Verify that the first subscription was terminated + verify_active_subscription_peers(&subscription_manager, vec![connected_peer_2]); + + // Disconnect the second peer + remove_peer_and_connection(peers_and_metadata.clone(), connected_peer_2); + + // Check and manage the subscriptions + let result = subscription_manager.check_and_manage_subscriptions().await; + + // Verify that the second subscription was terminated and an error was returned + verify_active_subscription_peers(&subscription_manager, vec![]); + assert_matches!(result, Err(Error::SubscriptionsReset(_))); + } + #[tokio::test] async fn test_check_subscription_health_connected() { // Create a consensus observer client @@ -461,11 +550,8 @@ mod test { // Check the active subscription and verify that it unhealthy (the peer is not connected) check_subscription_connection(&mut subscription_manager, peer_network_id, false); - // Terminate the subscription - let terminated_subscriptions = - terminate_any_unhealthy_subscriptions(&mut subscription_manager); - assert_eq!(terminated_subscriptions.len(), 1); - assert_eq!(terminated_subscriptions.first().unwrap().0, peer_network_id); + // Terminate unhealthy subscriptions and verify the subscription was removed + verify_terminated_unhealthy_subscriptions(&mut subscription_manager, vec![peer_network_id]); // Add a new connected peer let connected_peer = @@ -480,13 +566,14 @@ mod test { TimeService::mock(), ); - // Check the active subscriptions is still healthy + // Check the active subscription is still healthy check_subscription_connection(&mut subscription_manager, connected_peer, true); + // Terminate unhealthy subscriptions and verify none are removed + verify_terminated_unhealthy_subscriptions(&mut subscription_manager, vec![]); + // Verify that the active subscription is still present - assert!(subscription_manager - .get_active_subscription_peers() - .contains(&connected_peer)); + verify_active_subscription_peers(&subscription_manager, vec![connected_peer]); } #[tokio::test] @@ -529,6 +616,9 @@ mod test { // Check the active subscription and verify that it is healthy check_subscription_progress(&mut subscription_manager, connected_peer, true); + // Terminate unhealthy subscriptions and verify none are removed + verify_terminated_unhealthy_subscriptions(&mut subscription_manager, vec![]); + // Elapse time to simulate a DB progress error let mock_time_service = time_service.clone().into_mock(); mock_time_service.advance(Duration::from_millis( @@ -538,16 +628,11 @@ mod test { // Check the active subscription and verify that it is unhealthy (the DB is not syncing) check_subscription_progress(&mut subscription_manager, connected_peer, false); - // Terminate the subscription - let terminated_subscriptions = - terminate_any_unhealthy_subscriptions(&mut subscription_manager); - assert_eq!(terminated_subscriptions.len(), 1); - assert_eq!(terminated_subscriptions.first().unwrap().0, connected_peer); + // Terminate unhealthy subscriptions and verify the subscription was removed + verify_terminated_unhealthy_subscriptions(&mut subscription_manager, vec![connected_peer]); // Verify the active subscription is no longer present - assert!(subscription_manager - .get_active_subscription_peers() - .is_empty()); + verify_active_subscription_peers(&subscription_manager, vec![]); } #[tokio::test] @@ -585,6 +670,9 @@ mod test { // Check the active subscription and verify that it is healthy check_subscription_timeout(&mut subscription_manager, connected_peer, true); + // Terminate unhealthy subscriptions and verify none are removed + verify_terminated_unhealthy_subscriptions(&mut subscription_manager, vec![]); + // Elapse time to simulate a timeout let mock_time_service = time_service.clone().into_mock(); mock_time_service.advance(Duration::from_millis( @@ -594,16 +682,11 @@ mod test { // Check the active subscription and verify that it is unhealthy (the subscription timed out) check_subscription_timeout(&mut subscription_manager, connected_peer, false); - // Terminate the subscription - let terminated_subscriptions = - terminate_any_unhealthy_subscriptions(&mut subscription_manager); - assert_eq!(terminated_subscriptions.len(), 1); - assert_eq!(terminated_subscriptions.first().unwrap().0, connected_peer); + // Terminate unhealthy subscriptions and verify the subscription was removed + verify_terminated_unhealthy_subscriptions(&mut subscription_manager, vec![connected_peer]); // Verify the active subscription is no longer present - assert!(subscription_manager - .get_active_subscription_peers() - .is_empty()); + verify_active_subscription_peers(&subscription_manager, vec![]); } #[tokio::test] @@ -651,6 +734,9 @@ mod test { // Check the active subscription and verify that it is healthy check_subscription_optimality(&mut subscription_manager, suboptimal_peer, true); + // Terminate unhealthy subscriptions and verify none are removed + verify_terminated_unhealthy_subscriptions(&mut subscription_manager, vec![]); + // Elapse enough time to trigger the peer optimality check let mock_time_service = time_service.clone().into_mock(); mock_time_service.advance(Duration::from_millis( @@ -666,20 +752,89 @@ mod test { consensus_observer_config.subscription_refresh_interval_ms + 1, )); - // Terminate the subscription - let terminated_subscriptions = - terminate_any_unhealthy_subscriptions(&mut subscription_manager); - assert_eq!(terminated_subscriptions.len(), 1); - assert_eq!(terminated_subscriptions.first().unwrap().0, suboptimal_peer); + // Terminate any unhealthy subscriptions and verify the subscription was removed + verify_terminated_unhealthy_subscriptions(&mut subscription_manager, vec![suboptimal_peer]); // Verify the active subscription is no longer present - assert!(subscription_manager - .get_active_subscription_peers() - .is_empty()); + verify_active_subscription_peers(&subscription_manager, vec![]); } #[tokio::test] - async fn test_terminate_unhealthy_subscriptions() { + #[allow(clippy::await_holding_lock)] // Required to wait on the subscription creation task + async fn test_spawn_subscription_creation_task() { + // Create a consensus observer client + let network_id = NetworkId::Public; + let (_, consensus_observer_client) = create_consensus_observer_client(&[network_id]); + + // Create a new subscription manager + let consensus_observer_config = ConsensusObserverConfig::default(); + let db_reader = create_mock_db_reader(); + let time_service = TimeService::mock(); + let mut subscription_manager = SubscriptionManager::new( + consensus_observer_client, + consensus_observer_config, + None, + db_reader.clone(), + time_service.clone(), + ); + + // Verify that the active subscription creation task is empty + verify_subscription_creation_task(&subscription_manager, false); + + // Spawn a subscription creation task with 0 subscriptions to create + subscription_manager + .spawn_subscription_creation_task(0, vec![], vec![], hashmap![]) + .await; + + // Verify that the active subscription creation task is still empty (no task was spawned) + verify_subscription_creation_task(&subscription_manager, false); + + // Spawn a subscription creation task with 1 subscription to create + subscription_manager + .spawn_subscription_creation_task(1, vec![], vec![], hashmap![]) + .await; + + // Verify that the active subscription creation task is now populated + verify_subscription_creation_task(&subscription_manager, true); + + // Wait for the active subscription creation task to finish + if let Some(active_task) = subscription_manager + .active_subscription_creation_task + .lock() + .as_mut() + { + active_task.await.unwrap(); + } + + // Verify that the active subscription creation task is still present + verify_subscription_creation_task(&subscription_manager, true); + + // Verify that the active subscription creation task is finished + if let Some(active_task) = subscription_manager + .active_subscription_creation_task + .lock() + .as_ref() + { + assert!(active_task.is_finished()); + } + + // Spawn a subscription creation task with 2 subscriptions to create + subscription_manager + .spawn_subscription_creation_task(2, vec![], vec![], hashmap![]) + .await; + + // Verify the new active subscription creation task is not finished + if let Some(active_task) = subscription_manager + .active_subscription_creation_task + .lock() + .as_ref() + { + assert!(!active_task.is_finished()); + }; + } + + #[tokio::test] + async fn test_terminate_unhealthy_subscriptions_multiple() { // Create a consensus observer client let network_id = NetworkId::Public; let (peers_and_metadata, consensus_observer_client) = @@ -713,14 +868,8 @@ mod test { ); } - // Terminate any unhealthy subscriptions and verify that both subscriptions are still healthy - let terminated_subscriptions = - terminate_any_unhealthy_subscriptions(&mut subscription_manager); - assert!(terminated_subscriptions.is_empty()); - assert_eq!( - subscription_manager.get_active_subscription_peers().len(), - 2 - ); + // Terminate unhealthy subscriptions and verify that both subscriptions are still healthy + verify_terminated_unhealthy_subscriptions(&mut subscription_manager, vec![]); // Create another subscription let subscription_peer_3 = @@ -739,18 +888,14 @@ mod test { consensus_observer_config.max_subscription_timeout_ms + 1, )); - // Terminate the unhealthy subscriptions and verify the first two subscriptions were terminated - let terminated_subscriptions = - terminate_any_unhealthy_subscriptions(&mut subscription_manager); - assert_eq!(terminated_subscriptions.len(), 2); - assert_eq!(subscription_manager.get_active_subscription_peers(), vec![ - subscription_peer_3 + // Terminate unhealthy subscriptions and verify the first two subscriptions were terminated + verify_terminated_unhealthy_subscriptions(&mut subscription_manager, vec![ + subscription_peer_1, + subscription_peer_2, ]); - // Verify that both subscriptions were terminated due to a timeout - for (_, error) in terminated_subscriptions { - assert_matches!(error, Error::SubscriptionTimeout(_)); - } + // Verify the third subscription is still active + verify_active_subscription_peers(&subscription_manager, vec![subscription_peer_3]); } #[tokio::test] @@ -771,9 +916,7 @@ mod test { ); // Verify that no subscriptions are active - assert!(subscription_manager - .get_active_subscription_peers() - .is_empty()); + verify_active_subscription_peers(&subscription_manager, vec![]); // Create a new subscription let subscription_peer_1 = PeerNetworkId::random(); @@ -786,9 +929,7 @@ mod test { ); // Verify the subscription is active - assert!(subscription_manager - .get_active_subscription_peers() - .contains(&subscription_peer_1)); + verify_active_subscription_peers(&subscription_manager, vec![subscription_peer_1]); // Create another subscription let subscription_peer_2 = PeerNetworkId::random(); @@ -801,26 +942,16 @@ mod test { ); // Verify the second subscription is active - assert!(subscription_manager - .get_active_subscription_peers() - .contains(&subscription_peer_2)); + verify_active_subscription_peers(&subscription_manager, vec![ + subscription_peer_1, + subscription_peer_2, + ]); // Unsubscribe from the first peer subscription_manager.unsubscribe_from_peer(subscription_peer_1); // Verify that the first subscription is no longer active - assert!(!subscription_manager - .get_active_subscription_peers() - .contains(&subscription_peer_1)); - - // Verify that only the second subscription is still active - assert!(subscription_manager - .get_active_subscription_peers() - .contains(&subscription_peer_2)); - assert_eq!( - subscription_manager.get_active_subscription_peers().len(), - 1 - ); + verify_active_subscription_peers(&subscription_manager, vec![subscription_peer_2]); } #[tokio::test] @@ -1077,14 +1208,66 @@ mod test { peer_network_id } - /// A simple helper method that terminates any unhealthy subscriptions - fn terminate_any_unhealthy_subscriptions( + /// Removes the peer and connection metadata for the given peer + fn remove_peer_and_connection( + peers_and_metadata: Arc, + peer_network_id: PeerNetworkId, + ) { + let peer_metadata = peers_and_metadata + .get_metadata_for_peer(peer_network_id) + .unwrap(); + let connection_id = peer_metadata.get_connection_metadata().connection_id; + peers_and_metadata + .remove_peer_metadata(peer_network_id, connection_id) + .unwrap(); + } + + /// Verifies the active subscription peers + fn verify_active_subscription_peers( + subscription_manager: &SubscriptionManager, + expected_active_peers: Vec, + ) { + // Get the active subscription peers + let active_peers = subscription_manager.get_active_subscription_peers(); + + // Verify the active subscription peers + for peer in &expected_active_peers { + assert!(active_peers.contains(peer)); + } + assert_eq!(active_peers.len(), expected_active_peers.len()); + } + + /// Verifies the status of the active subscription creation task + fn verify_subscription_creation_task( + subscription_manager: &SubscriptionManager, + expect_active_task: bool, + ) { + let current_active_task = subscription_manager + .active_subscription_creation_task + .lock() + .is_some(); + assert_eq!(current_active_task, expect_active_task); + } + + /// Verifies the list of terminated unhealthy subscriptions + fn verify_terminated_unhealthy_subscriptions( subscription_manager: &mut SubscriptionManager, - ) -> Vec<(PeerNetworkId, Error)> { + expected_terminated_peers: Vec, + ) { // Get the connected peers and metadata let connected_peers_and_metadata = subscription_manager.get_connected_peers_and_metadata(); // Terminate any unhealthy subscriptions - subscription_manager.terminate_unhealthy_subscriptions(&connected_peers_and_metadata) + let terminated_subscriptions = + subscription_manager.terminate_unhealthy_subscriptions(&connected_peers_and_metadata); + + // Verify the terminated subscriptions + for (terminated_subscription_peer, _) in &terminated_subscriptions { + assert!(expected_terminated_peers.contains(terminated_subscription_peer)); + } + assert_eq!( + terminated_subscriptions.len(), + expected_terminated_peers.len() + ); } } From 9e835f70cc671b90ab3d03cdb9cb319f57912e04 Mon Sep 17 00:00:00 2001 From: Josh Lind Date: Fri, 13 Sep 2024 14:03:58 -0400 Subject: [PATCH 4/4] [Consensus Observer] Improve subscription utility unit tests. --- .../observer/subscription_utils.rs | 391 +++++++++++++++++- 1 file changed, 377 insertions(+), 14 deletions(-) diff --git a/consensus/src/consensus_observer/observer/subscription_utils.rs b/consensus/src/consensus_observer/observer/subscription_utils.rs index 7dd5ffa9b2ace..d654af8aaf0d5 100644 --- a/consensus/src/consensus_observer/observer/subscription_utils.rs +++ b/consensus/src/consensus_observer/observer/subscription_utils.rs @@ -358,20 +358,227 @@ fn supports_consensus_observer(peer_metadata: &PeerMetadata) -> bool { #[cfg(test)] mod tests { use super::*; + use aptos_channels::{aptos_channel, message_queues::QueueStyle}; use aptos_config::{config::PeerRole, network_id::NetworkId}; use aptos_netcore::transport::ConnectionOrigin; use aptos_network::{ application::storage::PeersAndMetadata, - protocols::wire::handshake::v1::{MessagingProtocolVersion, ProtocolIdSet}, + peer_manager::{ConnectionRequestSender, PeerManagerRequest, PeerManagerRequestSender}, + protocols::{ + network::{NetworkSender, NewNetworkSender}, + wire::handshake::v1::{MessagingProtocolVersion, ProtocolIdSet}, + }, transport::{ConnectionId, ConnectionMetadata}, }; use aptos_peer_monitoring_service_types::{ response::NetworkInformationResponse, PeerMonitoringMetadata, }; - use aptos_types::{network_address::NetworkAddress, PeerId}; - use maplit::hashmap; + use aptos_storage_interface::Result; + use aptos_types::{network_address::NetworkAddress, transaction::Version, PeerId}; + use bytes::Bytes; + use futures::StreamExt; + use mockall::mock; use std::collections::HashSet; + // This is a simple mock of the DbReader (it generates a MockDatabaseReader) + mock! { + pub DatabaseReader {} + impl DbReader for DatabaseReader { + fn get_latest_ledger_info_version(&self) -> Result; + } + } + + #[tokio::test(flavor = "multi_thread")] + async fn test_create_new_subscriptions() { + // Create a consensus observer config and client + let consensus_observer_config = ConsensusObserverConfig::default(); + let network_ids = &[NetworkId::Validator, NetworkId::Vfn, NetworkId::Public]; + let (peers_and_metadata, consensus_observer_client, mut peer_manager_request_receivers) = + create_consensus_observer_client(network_ids); + + // Create a list of connected peers (one per network) + let mut connected_peers = vec![]; + for network_id in &[NetworkId::Validator, NetworkId::Vfn, NetworkId::Public] { + // Create a new peer + let peer_network_id = create_peer_and_connection( + *network_id, + peers_and_metadata.clone(), + get_distance_from_validators(network_id), + None, + true, + ); + + // Add the peer to the list of sorted peers + connected_peers.push(peer_network_id); + } + + // Get the connected peers and metadata + let connected_peers_and_metadata = peers_and_metadata + .get_connected_peers_and_metadata() + .unwrap(); + + // Spawn the subscription creation task to create 2 subscriptions + let num_subscriptions_to_create = 2; + let subscription_creation_handle = tokio::spawn(async move { + create_new_subscriptions( + consensus_observer_config, + consensus_observer_client.clone(), + None, + Arc::new(MockDatabaseReader::new()), + TimeService::mock(), + connected_peers_and_metadata, + num_subscriptions_to_create, + vec![], + vec![], + ) + .await + }); + + // Handle the peer manager requests made by the subscription creation task. + // The VFN peer should fail the subscription request. + for connected_peer in &connected_peers { + let network_id = connected_peer.network_id(); + handle_next_subscription_request( + network_id, + &mut peer_manager_request_receivers, + network_id != NetworkId::Vfn, // The VFN peer should fail the subscription request + ) + .await; + } + + // Wait for the subscription creation task to complete + let consensus_observer_subscriptions = subscription_creation_handle.await.unwrap(); + + // Verify the number of created subscriptions + assert_eq!( + consensus_observer_subscriptions.len(), + num_subscriptions_to_create + ); + + // Verify the created subscription peers + let first_peer = *connected_peers.first().unwrap(); + let last_peer = *connected_peers.last().unwrap(); + let expected_subscription_peers = [first_peer, last_peer]; + for consensus_observer_subscription in consensus_observer_subscriptions { + let peer_network_id = consensus_observer_subscription.get_peer_network_id(); + assert!(expected_subscription_peers.contains(&peer_network_id)); + } + } + + #[tokio::test(flavor = "multi_thread")] + async fn test_create_new_subscriptions_multiple() { + // Create a consensus observer config and client + let consensus_observer_config = ConsensusObserverConfig::default(); + let network_ids = &[NetworkId::Validator, NetworkId::Vfn, NetworkId::Public]; + let (peers_and_metadata, consensus_observer_client, mut peer_manager_request_receivers) = + create_consensus_observer_client(network_ids); + + // Create a list of connected peers (one per network) + let mut connected_peers = vec![]; + for network_id in &[NetworkId::Validator, NetworkId::Vfn, NetworkId::Public] { + // Create a new peer + let peer_network_id = create_peer_and_connection( + *network_id, + peers_and_metadata.clone(), + get_distance_from_validators(network_id), + None, + true, + ); + + // Add the peer to the list of sorted peers + connected_peers.push(peer_network_id); + } + + // Create multiple sets of subscriptions and verify the results + for num_subscriptions_to_create in [0, 1, 2, 3, 10] { + // Determine the expected subscription peers + let expected_subscription_peers = connected_peers + .iter() + .take(num_subscriptions_to_create) + .cloned() + .collect(); + + // Create the subscriptions and verify the result + create_and_verify_subscriptions( + consensus_observer_config, + peers_and_metadata.clone(), + consensus_observer_client.clone(), + &mut peer_manager_request_receivers, + num_subscriptions_to_create, + expected_subscription_peers, + ) + .await; + } + } + + #[tokio::test(flavor = "multi_thread")] + async fn test_create_single_subscription() { + // Create a consensus observer config and client + let consensus_observer_config = ConsensusObserverConfig::default(); + let network_ids = &[NetworkId::Validator, NetworkId::Vfn, NetworkId::Public]; + let (peers_and_metadata, consensus_observer_client, mut peer_manager_request_receivers) = + create_consensus_observer_client(network_ids); + + // Create a list of connected peers (one per network) + let mut connected_peers = vec![]; + for network_id in &[NetworkId::Validator, NetworkId::Vfn, NetworkId::Public] { + // Create a new peer + let peer_network_id = + create_peer_and_connection(*network_id, peers_and_metadata.clone(), 0, None, true); + + // Add the peer to the list of sorted peers + connected_peers.push(peer_network_id); + } + + // Spawn the subscription creation task + let sorted_potential_peers = connected_peers.clone(); + let subscription_creation_handle = tokio::spawn(async move { + create_single_subscription( + consensus_observer_config, + consensus_observer_client.clone(), + Arc::new(MockDatabaseReader::new()), + sorted_potential_peers, + TimeService::mock(), + ) + .await + }); + + // Handle the peer manager requests made by the subscription creation task. + // We should only respond successfully to the peer on the public network. + handle_next_subscription_request( + NetworkId::Validator, + &mut peer_manager_request_receivers, + false, + ) + .await; + handle_next_subscription_request( + NetworkId::Vfn, + &mut peer_manager_request_receivers, + false, + ) + .await; + handle_next_subscription_request( + NetworkId::Public, + &mut peer_manager_request_receivers, + true, + ) + .await; + + // Wait for the subscription creation task to complete + let (observer_subscription, failed_subscription_peers) = + subscription_creation_handle.await.unwrap(); + + // Verify that the public peer was successfully subscribed to + assert_eq!( + &observer_subscription.unwrap().get_peer_network_id(), + connected_peers.last().unwrap() + ); + + // Verify that the other peers failed our subscription attempts + let expected_failed_peers = connected_peers.iter().take(2).cloned().collect::>(); + assert_eq!(failed_subscription_peers, expected_failed_peers); + } + #[test] fn test_sort_peers_by_distance_and_latency() { // Sort an empty list of peers @@ -487,7 +694,7 @@ mod tests { async fn test_sort_peers_for_subscriptions() { // Create a consensus observer client let network_ids = &[NetworkId::Validator, NetworkId::Vfn, NetworkId::Public]; - let (peers_and_metadata, consensus_observer_client) = + let (peers_and_metadata, consensus_observer_client, _) = create_consensus_observer_client(network_ids); // Create a consensus publisher @@ -507,15 +714,10 @@ mod tests { // Add a connected validator peer, VFN peer and public peer for network_id in network_ids { - let distance_from_validators = match network_id { - NetworkId::Validator => 0, - NetworkId::Vfn => 1, - NetworkId::Public => 2, - }; create_peer_and_connection( *network_id, peers_and_metadata.clone(), - distance_from_validators, + get_distance_from_validators(network_id), None, true, ); @@ -609,6 +811,64 @@ mod tests { assert_eq!(sorted_peers, expected_peers); } + /// Creates new subscriptions and verifies the results + async fn create_and_verify_subscriptions( + consensus_observer_config: ConsensusObserverConfig, + peers_and_metadata: Arc, + consensus_observer_client: Arc< + ConsensusObserverClient>, + >, + peer_manager_request_receivers: &mut HashMap< + NetworkId, + aptos_channel::Receiver<(PeerId, ProtocolId), PeerManagerRequest>, + >, + num_subscriptions_to_create: usize, + expected_subscription_peers: Vec, + ) { + // Get the connected peers and metadata + let connected_peers_and_metadata = peers_and_metadata + .get_connected_peers_and_metadata() + .unwrap(); + + // Spawn the subscription creation task + let subscription_creation_handle = tokio::spawn(async move { + create_new_subscriptions( + consensus_observer_config, + consensus_observer_client.clone(), + None, + Arc::new(MockDatabaseReader::new()), + TimeService::mock(), + connected_peers_and_metadata, + num_subscriptions_to_create, + vec![], + vec![], + ) + .await + }); + + // Handle the peer manager requests made by the subscription creation task + for expected_subscription_peer in &expected_subscription_peers { + handle_next_subscription_request( + expected_subscription_peer.network_id(), + peer_manager_request_receivers, + true, + ) + .await; + } + + // Wait for the subscription creation task to complete + let consensus_observer_subscriptions = subscription_creation_handle.await.unwrap(); + + // Verify the created subscriptions + assert_eq!( + consensus_observer_subscriptions.len(), + expected_subscription_peers.len() + ); + for subscription in consensus_observer_subscriptions { + assert!(expected_subscription_peers.contains(&subscription.get_peer_network_id())); + } + } + /// Creates a new connection metadata for testing fn create_connection_metadata( peer_network_id: PeerNetworkId, @@ -636,19 +896,52 @@ mod tests { } } - /// Creates a new consensus observer client and a peers and metadata container + /// Creates a new consensus observer client, along with the + /// associated network senders and peers and metadata. fn create_consensus_observer_client( network_ids: &[NetworkId], ) -> ( Arc, Arc>>, + HashMap>, ) { + // Create the network senders and receivers for each network + let mut network_senders = HashMap::new(); + let mut peer_manager_request_receivers = HashMap::new(); + for network_id in network_ids { + // Create the request managers + let queue_cfg = aptos_channel::Config::new(10).queue_style(QueueStyle::FIFO); + let (peer_manager_request_sender, peer_manager_request_receiver) = queue_cfg.build(); + let (connected_request_sender, _) = queue_cfg.build(); + + // Create the network sender + let network_sender = NetworkSender::new( + PeerManagerRequestSender::new(peer_manager_request_sender), + ConnectionRequestSender::new(connected_request_sender), + ); + + // Save the network sender and the request receiver + network_senders.insert(*network_id, network_sender); + peer_manager_request_receivers.insert(*network_id, peer_manager_request_receiver); + } + + // Create the network client let peers_and_metadata = PeersAndMetadata::new(network_ids); - let network_client = - NetworkClient::new(vec![], vec![], hashmap![], peers_and_metadata.clone()); + let network_client = NetworkClient::new( + vec![ProtocolId::ConsensusObserver], + vec![ProtocolId::ConsensusObserverRpc], + network_senders, + peers_and_metadata.clone(), + ); + + // Create the consensus observer client let consensus_observer_client = Arc::new(ConsensusObserverClient::new(network_client)); - (peers_and_metadata, consensus_observer_client) + ( + peers_and_metadata, + consensus_observer_client, + peer_manager_request_receivers, + ) } /// Creates a new peer with the specified connection metadata @@ -754,6 +1047,76 @@ mod tests { peers_and_metadata } + /// Returns the distance from the validators for the specified network + fn get_distance_from_validators(network_id: &NetworkId) -> u64 { + match network_id { + NetworkId::Validator => 0, + NetworkId::Vfn => 1, + NetworkId::Public => 2, + } + } + + /// Fetches and handles the next subscription request from the peer manager + async fn handle_next_subscription_request( + network_id: NetworkId, + peer_manager_request_receivers: &mut HashMap< + NetworkId, + aptos_channel::Receiver<(PeerId, ProtocolId), PeerManagerRequest>, + >, + return_successfully: bool, + ) { + // Get the request receiver for the given network + let peer_manager_request_receiver = + peer_manager_request_receivers.get_mut(&network_id).unwrap(); + + // Wait for the next subscription request + match peer_manager_request_receiver.next().await { + Some(PeerManagerRequest::SendRpc(_, network_request)) => { + // Parse the network request + let data = network_request.data; + let response_sender = network_request.res_tx; + let message: ConsensusObserverMessage = bcs::from_bytes(data.as_ref()).unwrap(); + + // Process the network message + match message { + ConsensusObserverMessage::Request(request) => { + // Verify the request is for a new subscription + match request { + ConsensusObserverRequest::Subscribe => (), + _ => panic!( + "Unexpected consensus observer request received: {:?}!", + request + ), + } + + // Determine the response to send + let response = if return_successfully { + // Ack the subscription request + ConsensusObserverResponse::SubscribeAck + } else { + // Respond with the wrong message type + ConsensusObserverResponse::UnsubscribeAck + }; + let response_message = ConsensusObserverMessage::Response(response); + + // Send the response to the peer + let response_bytes = + bcs::to_bytes(&response_message).map(Bytes::from).unwrap(); + let _ = response_sender.send(Ok(response_bytes)); + }, + _ => panic!( + "Unexpected consensus observer message type received: {:?}!", + message + ), + } + }, + Some(PeerManagerRequest::SendDirectSend(_, _)) => { + panic!("Unexpected direct send message received!") + }, + None => panic!("No subscription request received!"), + } + } + /// Removes the peer and connection metadata for the given peer fn remove_peer_and_connection( peers_and_metadata: Arc,