Skip to content

Commit

Permalink
[Consensus Observer] Improve subscription util unit tests.
Browse files Browse the repository at this point in the history
  • Loading branch information
JoshLind committed Sep 13, 2024
1 parent 326056d commit e61a54e
Showing 1 changed file with 191 additions and 8 deletions.
199 changes: 191 additions & 8 deletions consensus/src/consensus_observer/observer/subscription_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -358,20 +358,109 @@ fn supports_consensus_observer(peer_metadata: &PeerMetadata) -> bool {
#[cfg(test)]
mod tests {
use super::*;
use aptos_channels::{aptos_channel, message_queues::QueueStyle};
use aptos_config::{config::PeerRole, network_id::NetworkId};
use aptos_netcore::transport::ConnectionOrigin;
use aptos_network::{
application::storage::PeersAndMetadata,
protocols::wire::handshake::v1::{MessagingProtocolVersion, ProtocolIdSet},
peer_manager::{ConnectionRequestSender, PeerManagerRequest, PeerManagerRequestSender},
protocols::{
network::{NetworkSender, NewNetworkSender},
wire::handshake::v1::{MessagingProtocolVersion, ProtocolIdSet},
},
transport::{ConnectionId, ConnectionMetadata},
};
use aptos_peer_monitoring_service_types::{
response::NetworkInformationResponse, PeerMonitoringMetadata,
};
use aptos_types::{network_address::NetworkAddress, PeerId};
use maplit::hashmap;
use aptos_storage_interface::Result;
use aptos_types::{network_address::NetworkAddress, transaction::Version, PeerId};
use bytes::Bytes;
use futures::StreamExt;
use mockall::mock;
use std::collections::HashSet;

// This is a simple mock of the DbReader (it generates a MockDatabaseReader)
mock! {
pub DatabaseReader {}
impl DbReader for DatabaseReader {
fn get_latest_ledger_info_version(&self) -> Result<Version>;
}
}

#[tokio::test(flavor = "multi_thread")]
async fn test_create_single_subscription() {
// Create a consensus observer config with a short network timeout
let consensus_observer_config = ConsensusObserverConfig {
network_request_timeout_ms: 1000,
..ConsensusObserverConfig::default()
};

// Create a consensus observer client
let network_ids = &[NetworkId::Validator, NetworkId::Vfn, NetworkId::Public];
let (peers_and_metadata, consensus_observer_client, mut peer_manager_request_receivers) =
create_consensus_observer_client(network_ids);

// Create a list of connected peers (one per network)
let mut connected_peers = vec![];
for network_id in &[NetworkId::Validator, NetworkId::Vfn, NetworkId::Public] {
// Create a new peer
let peer_network_id =
create_peer_and_connection(*network_id, peers_and_metadata.clone(), 0, None, true);

// Add the peer to the list of sorted peers
connected_peers.push(peer_network_id);
}

// Create a single subscription to the list of sorted peers
let sorted_potential_peers = connected_peers.clone();
let subscription_creation_handle = tokio::spawn(async move {
create_single_subscription(
consensus_observer_config,
consensus_observer_client.clone(),
Arc::new(MockDatabaseReader::new()),
sorted_potential_peers,
TimeService::mock(),
)
.await
});

// Handle the peer manager requests made by the subscription creation task.
// We should only respond successfully to the peer on the public network.
handle_next_subscription_request(
NetworkId::Validator,
&mut peer_manager_request_receivers,
false,
)
.await;
handle_next_subscription_request(
NetworkId::Vfn,
&mut peer_manager_request_receivers,
false,
)
.await;
handle_next_subscription_request(
NetworkId::Public,
&mut peer_manager_request_receivers,
true,
)
.await;

// Wait for the subscription creation task to complete
let (observer_subscription, failed_subscription_peers) =
subscription_creation_handle.await.unwrap();

// Verify that the public peer was successfully subscribed to
assert_eq!(
&observer_subscription.unwrap().get_peer_network_id(),
connected_peers.last().unwrap()
);

// Verify that the other peers failed our subscription attempts
let expected_failed_peers = connected_peers.iter().take(2).cloned().collect::<Vec<_>>();
assert_eq!(failed_subscription_peers, expected_failed_peers);
}

#[test]
fn test_sort_peers_by_distance_and_latency() {
// Sort an empty list of peers
Expand Down Expand Up @@ -487,7 +576,7 @@ mod tests {
async fn test_sort_peers_for_subscriptions() {
// Create a consensus observer client
let network_ids = &[NetworkId::Validator, NetworkId::Vfn, NetworkId::Public];
let (peers_and_metadata, consensus_observer_client) =
let (peers_and_metadata, consensus_observer_client, _) =
create_consensus_observer_client(network_ids);

// Create a consensus publisher
Expand Down Expand Up @@ -636,19 +725,52 @@ mod tests {
}
}

/// Creates a new consensus observer client and a peers and metadata container
/// Creates a new consensus observer client, along with the
/// associated network senders and peers and metadata.
fn create_consensus_observer_client(
network_ids: &[NetworkId],
) -> (
Arc<PeersAndMetadata>,
Arc<ConsensusObserverClient<NetworkClient<ConsensusObserverMessage>>>,
HashMap<NetworkId, aptos_channel::Receiver<(PeerId, ProtocolId), PeerManagerRequest>>,
) {
// Create the network senders and receivers for each network
let mut network_senders = HashMap::new();
let mut peer_manager_request_receivers = HashMap::new();
for network_id in network_ids {
// Create the request managers
let queue_cfg = aptos_channel::Config::new(10).queue_style(QueueStyle::FIFO);
let (peer_manager_request_sender, peer_manager_request_receiver) = queue_cfg.build();
let (connected_request_sender, _) = queue_cfg.build();

// Create the network sender
let network_sender = NetworkSender::new(
PeerManagerRequestSender::new(peer_manager_request_sender),
ConnectionRequestSender::new(connected_request_sender),
);

// Save the network sender and the request receiver
network_senders.insert(*network_id, network_sender);
peer_manager_request_receivers.insert(*network_id, peer_manager_request_receiver);
}

// Create the network client
let peers_and_metadata = PeersAndMetadata::new(network_ids);
let network_client =
NetworkClient::new(vec![], vec![], hashmap![], peers_and_metadata.clone());
let network_client = NetworkClient::new(
vec![ProtocolId::ConsensusObserver],
vec![ProtocolId::ConsensusObserverRpc],
network_senders,
peers_and_metadata.clone(),
);

// Create the consensus observer client
let consensus_observer_client = Arc::new(ConsensusObserverClient::new(network_client));

(peers_and_metadata, consensus_observer_client)
(
peers_and_metadata,
consensus_observer_client,
peer_manager_request_receivers,
)
}

/// Creates a new peer with the specified connection metadata
Expand Down Expand Up @@ -820,4 +942,65 @@ mod tests {
previous_distance = distance;
}
}

/// Fetches and handles the next subscription request from the peer manager
async fn handle_next_subscription_request(
network_id: NetworkId,
peer_manager_request_receivers: &mut HashMap<
NetworkId,
aptos_channel::Receiver<(PeerId, ProtocolId), PeerManagerRequest>,
>,
return_successfully: bool,
) {
// Get the request receiver for the given network
let peer_manager_request_receiver =
peer_manager_request_receivers.get_mut(&network_id).unwrap();

// Wait for the next subscription request
match peer_manager_request_receiver.next().await {
Some(PeerManagerRequest::SendRpc(_, network_request)) => {
// Parse the network request
let data = network_request.data;
let response_sender = network_request.res_tx;
let message: ConsensusObserverMessage = bcs::from_bytes(data.as_ref()).unwrap();

// Process the network message
match message {
ConsensusObserverMessage::Request(request) => {
// Verify the request is for a new subscription
match request {
ConsensusObserverRequest::Subscribe => (),
_ => panic!(
"Unexpected consensus observer request received: {:?}!",
request
),
}

// Determine the response to send
let response = if return_successfully {
// Ack the subscription request
ConsensusObserverResponse::SubscribeAck
} else {
// Respond with the wrong message type
ConsensusObserverResponse::UnsubscribeAck
};
let response_message = ConsensusObserverMessage::Response(response);

// Send the response to the peer
let response_bytes =
bcs::to_bytes(&response_message).map(Bytes::from).unwrap();
let _ = response_sender.send(Ok(response_bytes));
},
_ => panic!(
"Unexpected consensus observer message type received: {:?}!",
message
),
}
},
Some(PeerManagerRequest::SendDirectSend(_, _)) => {
panic!("Unexpected direct send message received!")
},
None => panic!("No subscription request received!"),
}
}
}

0 comments on commit e61a54e

Please sign in to comment.