diff --git a/consensus/src/consensus_observer/observer/subscription_utils.rs b/consensus/src/consensus_observer/observer/subscription_utils.rs index 7dd5ffa9b2ace3..d654af8aaf0d5d 100644 --- a/consensus/src/consensus_observer/observer/subscription_utils.rs +++ b/consensus/src/consensus_observer/observer/subscription_utils.rs @@ -358,20 +358,227 @@ fn supports_consensus_observer(peer_metadata: &PeerMetadata) -> bool { #[cfg(test)] mod tests { use super::*; + use aptos_channels::{aptos_channel, message_queues::QueueStyle}; use aptos_config::{config::PeerRole, network_id::NetworkId}; use aptos_netcore::transport::ConnectionOrigin; use aptos_network::{ application::storage::PeersAndMetadata, - protocols::wire::handshake::v1::{MessagingProtocolVersion, ProtocolIdSet}, + peer_manager::{ConnectionRequestSender, PeerManagerRequest, PeerManagerRequestSender}, + protocols::{ + network::{NetworkSender, NewNetworkSender}, + wire::handshake::v1::{MessagingProtocolVersion, ProtocolIdSet}, + }, transport::{ConnectionId, ConnectionMetadata}, }; use aptos_peer_monitoring_service_types::{ response::NetworkInformationResponse, PeerMonitoringMetadata, }; - use aptos_types::{network_address::NetworkAddress, PeerId}; - use maplit::hashmap; + use aptos_storage_interface::Result; + use aptos_types::{network_address::NetworkAddress, transaction::Version, PeerId}; + use bytes::Bytes; + use futures::StreamExt; + use mockall::mock; use std::collections::HashSet; + // This is a simple mock of the DbReader (it generates a MockDatabaseReader) + mock! { + pub DatabaseReader {} + impl DbReader for DatabaseReader { + fn get_latest_ledger_info_version(&self) -> Result; + } + } + + #[tokio::test(flavor = "multi_thread")] + async fn test_create_new_subscriptions() { + // Create a consensus observer config and client + let consensus_observer_config = ConsensusObserverConfig::default(); + let network_ids = &[NetworkId::Validator, NetworkId::Vfn, NetworkId::Public]; + let (peers_and_metadata, consensus_observer_client, mut peer_manager_request_receivers) = + create_consensus_observer_client(network_ids); + + // Create a list of connected peers (one per network) + let mut connected_peers = vec![]; + for network_id in &[NetworkId::Validator, NetworkId::Vfn, NetworkId::Public] { + // Create a new peer + let peer_network_id = create_peer_and_connection( + *network_id, + peers_and_metadata.clone(), + get_distance_from_validators(network_id), + None, + true, + ); + + // Add the peer to the list of sorted peers + connected_peers.push(peer_network_id); + } + + // Get the connected peers and metadata + let connected_peers_and_metadata = peers_and_metadata + .get_connected_peers_and_metadata() + .unwrap(); + + // Spawn the subscription creation task to create 2 subscriptions + let num_subscriptions_to_create = 2; + let subscription_creation_handle = tokio::spawn(async move { + create_new_subscriptions( + consensus_observer_config, + consensus_observer_client.clone(), + None, + Arc::new(MockDatabaseReader::new()), + TimeService::mock(), + connected_peers_and_metadata, + num_subscriptions_to_create, + vec![], + vec![], + ) + .await + }); + + // Handle the peer manager requests made by the subscription creation task. + // The VFN peer should fail the subscription request. + for connected_peer in &connected_peers { + let network_id = connected_peer.network_id(); + handle_next_subscription_request( + network_id, + &mut peer_manager_request_receivers, + network_id != NetworkId::Vfn, // The VFN peer should fail the subscription request + ) + .await; + } + + // Wait for the subscription creation task to complete + let consensus_observer_subscriptions = subscription_creation_handle.await.unwrap(); + + // Verify the number of created subscriptions + assert_eq!( + consensus_observer_subscriptions.len(), + num_subscriptions_to_create + ); + + // Verify the created subscription peers + let first_peer = *connected_peers.first().unwrap(); + let last_peer = *connected_peers.last().unwrap(); + let expected_subscription_peers = [first_peer, last_peer]; + for consensus_observer_subscription in consensus_observer_subscriptions { + let peer_network_id = consensus_observer_subscription.get_peer_network_id(); + assert!(expected_subscription_peers.contains(&peer_network_id)); + } + } + + #[tokio::test(flavor = "multi_thread")] + async fn test_create_new_subscriptions_multiple() { + // Create a consensus observer config and client + let consensus_observer_config = ConsensusObserverConfig::default(); + let network_ids = &[NetworkId::Validator, NetworkId::Vfn, NetworkId::Public]; + let (peers_and_metadata, consensus_observer_client, mut peer_manager_request_receivers) = + create_consensus_observer_client(network_ids); + + // Create a list of connected peers (one per network) + let mut connected_peers = vec![]; + for network_id in &[NetworkId::Validator, NetworkId::Vfn, NetworkId::Public] { + // Create a new peer + let peer_network_id = create_peer_and_connection( + *network_id, + peers_and_metadata.clone(), + get_distance_from_validators(network_id), + None, + true, + ); + + // Add the peer to the list of sorted peers + connected_peers.push(peer_network_id); + } + + // Create multiple sets of subscriptions and verify the results + for num_subscriptions_to_create in [0, 1, 2, 3, 10] { + // Determine the expected subscription peers + let expected_subscription_peers = connected_peers + .iter() + .take(num_subscriptions_to_create) + .cloned() + .collect(); + + // Create the subscriptions and verify the result + create_and_verify_subscriptions( + consensus_observer_config, + peers_and_metadata.clone(), + consensus_observer_client.clone(), + &mut peer_manager_request_receivers, + num_subscriptions_to_create, + expected_subscription_peers, + ) + .await; + } + } + + #[tokio::test(flavor = "multi_thread")] + async fn test_create_single_subscription() { + // Create a consensus observer config and client + let consensus_observer_config = ConsensusObserverConfig::default(); + let network_ids = &[NetworkId::Validator, NetworkId::Vfn, NetworkId::Public]; + let (peers_and_metadata, consensus_observer_client, mut peer_manager_request_receivers) = + create_consensus_observer_client(network_ids); + + // Create a list of connected peers (one per network) + let mut connected_peers = vec![]; + for network_id in &[NetworkId::Validator, NetworkId::Vfn, NetworkId::Public] { + // Create a new peer + let peer_network_id = + create_peer_and_connection(*network_id, peers_and_metadata.clone(), 0, None, true); + + // Add the peer to the list of sorted peers + connected_peers.push(peer_network_id); + } + + // Spawn the subscription creation task + let sorted_potential_peers = connected_peers.clone(); + let subscription_creation_handle = tokio::spawn(async move { + create_single_subscription( + consensus_observer_config, + consensus_observer_client.clone(), + Arc::new(MockDatabaseReader::new()), + sorted_potential_peers, + TimeService::mock(), + ) + .await + }); + + // Handle the peer manager requests made by the subscription creation task. + // We should only respond successfully to the peer on the public network. + handle_next_subscription_request( + NetworkId::Validator, + &mut peer_manager_request_receivers, + false, + ) + .await; + handle_next_subscription_request( + NetworkId::Vfn, + &mut peer_manager_request_receivers, + false, + ) + .await; + handle_next_subscription_request( + NetworkId::Public, + &mut peer_manager_request_receivers, + true, + ) + .await; + + // Wait for the subscription creation task to complete + let (observer_subscription, failed_subscription_peers) = + subscription_creation_handle.await.unwrap(); + + // Verify that the public peer was successfully subscribed to + assert_eq!( + &observer_subscription.unwrap().get_peer_network_id(), + connected_peers.last().unwrap() + ); + + // Verify that the other peers failed our subscription attempts + let expected_failed_peers = connected_peers.iter().take(2).cloned().collect::>(); + assert_eq!(failed_subscription_peers, expected_failed_peers); + } + #[test] fn test_sort_peers_by_distance_and_latency() { // Sort an empty list of peers @@ -487,7 +694,7 @@ mod tests { async fn test_sort_peers_for_subscriptions() { // Create a consensus observer client let network_ids = &[NetworkId::Validator, NetworkId::Vfn, NetworkId::Public]; - let (peers_and_metadata, consensus_observer_client) = + let (peers_and_metadata, consensus_observer_client, _) = create_consensus_observer_client(network_ids); // Create a consensus publisher @@ -507,15 +714,10 @@ mod tests { // Add a connected validator peer, VFN peer and public peer for network_id in network_ids { - let distance_from_validators = match network_id { - NetworkId::Validator => 0, - NetworkId::Vfn => 1, - NetworkId::Public => 2, - }; create_peer_and_connection( *network_id, peers_and_metadata.clone(), - distance_from_validators, + get_distance_from_validators(network_id), None, true, ); @@ -609,6 +811,64 @@ mod tests { assert_eq!(sorted_peers, expected_peers); } + /// Creates new subscriptions and verifies the results + async fn create_and_verify_subscriptions( + consensus_observer_config: ConsensusObserverConfig, + peers_and_metadata: Arc, + consensus_observer_client: Arc< + ConsensusObserverClient>, + >, + peer_manager_request_receivers: &mut HashMap< + NetworkId, + aptos_channel::Receiver<(PeerId, ProtocolId), PeerManagerRequest>, + >, + num_subscriptions_to_create: usize, + expected_subscription_peers: Vec, + ) { + // Get the connected peers and metadata + let connected_peers_and_metadata = peers_and_metadata + .get_connected_peers_and_metadata() + .unwrap(); + + // Spawn the subscription creation task + let subscription_creation_handle = tokio::spawn(async move { + create_new_subscriptions( + consensus_observer_config, + consensus_observer_client.clone(), + None, + Arc::new(MockDatabaseReader::new()), + TimeService::mock(), + connected_peers_and_metadata, + num_subscriptions_to_create, + vec![], + vec![], + ) + .await + }); + + // Handle the peer manager requests made by the subscription creation task + for expected_subscription_peer in &expected_subscription_peers { + handle_next_subscription_request( + expected_subscription_peer.network_id(), + peer_manager_request_receivers, + true, + ) + .await; + } + + // Wait for the subscription creation task to complete + let consensus_observer_subscriptions = subscription_creation_handle.await.unwrap(); + + // Verify the created subscriptions + assert_eq!( + consensus_observer_subscriptions.len(), + expected_subscription_peers.len() + ); + for subscription in consensus_observer_subscriptions { + assert!(expected_subscription_peers.contains(&subscription.get_peer_network_id())); + } + } + /// Creates a new connection metadata for testing fn create_connection_metadata( peer_network_id: PeerNetworkId, @@ -636,19 +896,52 @@ mod tests { } } - /// Creates a new consensus observer client and a peers and metadata container + /// Creates a new consensus observer client, along with the + /// associated network senders and peers and metadata. fn create_consensus_observer_client( network_ids: &[NetworkId], ) -> ( Arc, Arc>>, + HashMap>, ) { + // Create the network senders and receivers for each network + let mut network_senders = HashMap::new(); + let mut peer_manager_request_receivers = HashMap::new(); + for network_id in network_ids { + // Create the request managers + let queue_cfg = aptos_channel::Config::new(10).queue_style(QueueStyle::FIFO); + let (peer_manager_request_sender, peer_manager_request_receiver) = queue_cfg.build(); + let (connected_request_sender, _) = queue_cfg.build(); + + // Create the network sender + let network_sender = NetworkSender::new( + PeerManagerRequestSender::new(peer_manager_request_sender), + ConnectionRequestSender::new(connected_request_sender), + ); + + // Save the network sender and the request receiver + network_senders.insert(*network_id, network_sender); + peer_manager_request_receivers.insert(*network_id, peer_manager_request_receiver); + } + + // Create the network client let peers_and_metadata = PeersAndMetadata::new(network_ids); - let network_client = - NetworkClient::new(vec![], vec![], hashmap![], peers_and_metadata.clone()); + let network_client = NetworkClient::new( + vec![ProtocolId::ConsensusObserver], + vec![ProtocolId::ConsensusObserverRpc], + network_senders, + peers_and_metadata.clone(), + ); + + // Create the consensus observer client let consensus_observer_client = Arc::new(ConsensusObserverClient::new(network_client)); - (peers_and_metadata, consensus_observer_client) + ( + peers_and_metadata, + consensus_observer_client, + peer_manager_request_receivers, + ) } /// Creates a new peer with the specified connection metadata @@ -754,6 +1047,76 @@ mod tests { peers_and_metadata } + /// Returns the distance from the validators for the specified network + fn get_distance_from_validators(network_id: &NetworkId) -> u64 { + match network_id { + NetworkId::Validator => 0, + NetworkId::Vfn => 1, + NetworkId::Public => 2, + } + } + + /// Fetches and handles the next subscription request from the peer manager + async fn handle_next_subscription_request( + network_id: NetworkId, + peer_manager_request_receivers: &mut HashMap< + NetworkId, + aptos_channel::Receiver<(PeerId, ProtocolId), PeerManagerRequest>, + >, + return_successfully: bool, + ) { + // Get the request receiver for the given network + let peer_manager_request_receiver = + peer_manager_request_receivers.get_mut(&network_id).unwrap(); + + // Wait for the next subscription request + match peer_manager_request_receiver.next().await { + Some(PeerManagerRequest::SendRpc(_, network_request)) => { + // Parse the network request + let data = network_request.data; + let response_sender = network_request.res_tx; + let message: ConsensusObserverMessage = bcs::from_bytes(data.as_ref()).unwrap(); + + // Process the network message + match message { + ConsensusObserverMessage::Request(request) => { + // Verify the request is for a new subscription + match request { + ConsensusObserverRequest::Subscribe => (), + _ => panic!( + "Unexpected consensus observer request received: {:?}!", + request + ), + } + + // Determine the response to send + let response = if return_successfully { + // Ack the subscription request + ConsensusObserverResponse::SubscribeAck + } else { + // Respond with the wrong message type + ConsensusObserverResponse::UnsubscribeAck + }; + let response_message = ConsensusObserverMessage::Response(response); + + // Send the response to the peer + let response_bytes = + bcs::to_bytes(&response_message).map(Bytes::from).unwrap(); + let _ = response_sender.send(Ok(response_bytes)); + }, + _ => panic!( + "Unexpected consensus observer message type received: {:?}!", + message + ), + } + }, + Some(PeerManagerRequest::SendDirectSend(_, _)) => { + panic!("Unexpected direct send message received!") + }, + None => panic!("No subscription request received!"), + } + } + /// Removes the peer and connection metadata for the given peer fn remove_peer_and_connection( peers_and_metadata: Arc,