Skip to content

Commit

Permalink
[Consensus Observer] Improve subscription util unit tests.
Browse files Browse the repository at this point in the history
  • Loading branch information
JoshLind committed Sep 13, 2024
1 parent 326056d commit ac75a80
Showing 1 changed file with 310 additions and 14 deletions.
324 changes: 310 additions & 14 deletions consensus/src/consensus_observer/observer/subscription_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -358,20 +358,160 @@ fn supports_consensus_observer(peer_metadata: &PeerMetadata) -> bool {
#[cfg(test)]
mod tests {
use super::*;
use aptos_channels::{aptos_channel, message_queues::QueueStyle};
use aptos_config::{config::PeerRole, network_id::NetworkId};
use aptos_netcore::transport::ConnectionOrigin;
use aptos_network::{
application::storage::PeersAndMetadata,
protocols::wire::handshake::v1::{MessagingProtocolVersion, ProtocolIdSet},
peer_manager::{ConnectionRequestSender, PeerManagerRequest, PeerManagerRequestSender},
protocols::{
network::{NetworkSender, NewNetworkSender},
wire::handshake::v1::{MessagingProtocolVersion, ProtocolIdSet},
},
transport::{ConnectionId, ConnectionMetadata},
};
use aptos_peer_monitoring_service_types::{
response::NetworkInformationResponse, PeerMonitoringMetadata,
};
use aptos_types::{network_address::NetworkAddress, PeerId};
use maplit::hashmap;
use aptos_storage_interface::Result;
use aptos_types::{network_address::NetworkAddress, transaction::Version, PeerId};
use bytes::Bytes;
use futures::StreamExt;
use mockall::mock;
use std::collections::HashSet;

// This is a simple mock of the DbReader (it generates a MockDatabaseReader)
mock! {
pub DatabaseReader {}
impl DbReader for DatabaseReader {
fn get_latest_ledger_info_version(&self) -> Result<Version>;
}
}

#[tokio::test(flavor = "multi_thread")]
async fn test_create_new_subscriptions_multiple() {
// Create a consensus observer config with a short network timeout
let consensus_observer_config = ConsensusObserverConfig {
network_request_timeout_ms: 1000,
..ConsensusObserverConfig::default()
};

// Create a consensus observer client
let network_ids = &[NetworkId::Validator, NetworkId::Vfn, NetworkId::Public];
let (peers_and_metadata, consensus_observer_client, mut peer_manager_request_receivers) =
create_consensus_observer_client(network_ids);

// Create a list of connected peers (one per network)
let mut connected_peers = vec![];
for network_id in &[NetworkId::Validator, NetworkId::Vfn, NetworkId::Public] {
// Create a new peer
let peer_network_id = create_peer_and_connection(
*network_id,
peers_and_metadata.clone(),
get_distance_from_validators(network_id),
None,
true,
);

// Add the peer to the list of sorted peers
connected_peers.push(peer_network_id);
}

// Create multiple sets of subscriptions and verify the results
for num_subscriptions_to_create in [0, 1, 2, 3, 10] {
// Determine the expected subscription peers
let expected_subscription_peers = connected_peers
.iter()
.take(num_subscriptions_to_create)
.cloned()
.collect();

// Create the subscriptions and verify the result
create_and_verify_subscriptions(
consensus_observer_config,
peers_and_metadata.clone(),
consensus_observer_client.clone(),
&mut peer_manager_request_receivers,
num_subscriptions_to_create,
expected_subscription_peers,
)
.await;
}
}

#[tokio::test(flavor = "multi_thread")]
async fn test_create_single_subscription() {
// Create a consensus observer config with a short network timeout
let consensus_observer_config = ConsensusObserverConfig {
network_request_timeout_ms: 1000,
..ConsensusObserverConfig::default()
};

// Create a consensus observer client
let network_ids = &[NetworkId::Validator, NetworkId::Vfn, NetworkId::Public];
let (peers_and_metadata, consensus_observer_client, mut peer_manager_request_receivers) =
create_consensus_observer_client(network_ids);

// Create a list of connected peers (one per network)
let mut connected_peers = vec![];
for network_id in &[NetworkId::Validator, NetworkId::Vfn, NetworkId::Public] {
// Create a new peer
let peer_network_id =
create_peer_and_connection(*network_id, peers_and_metadata.clone(), 0, None, true);

// Add the peer to the list of sorted peers
connected_peers.push(peer_network_id);
}

// Spawn the subscription creation task
let sorted_potential_peers = connected_peers.clone();
let subscription_creation_handle = tokio::spawn(async move {
create_single_subscription(
consensus_observer_config,
consensus_observer_client.clone(),
Arc::new(MockDatabaseReader::new()),
sorted_potential_peers,
TimeService::mock(),
)
.await
});

// Handle the peer manager requests made by the subscription creation task.
// We should only respond successfully to the peer on the public network.
handle_next_subscription_request(
NetworkId::Validator,
&mut peer_manager_request_receivers,
false,
)
.await;
handle_next_subscription_request(
NetworkId::Vfn,
&mut peer_manager_request_receivers,
false,
)
.await;
handle_next_subscription_request(
NetworkId::Public,
&mut peer_manager_request_receivers,
true,
)
.await;

// Wait for the subscription creation task to complete
let (observer_subscription, failed_subscription_peers) =
subscription_creation_handle.await.unwrap();

// Verify that the public peer was successfully subscribed to
assert_eq!(
&observer_subscription.unwrap().get_peer_network_id(),
connected_peers.last().unwrap()
);

// Verify that the other peers failed our subscription attempts
let expected_failed_peers = connected_peers.iter().take(2).cloned().collect::<Vec<_>>();
assert_eq!(failed_subscription_peers, expected_failed_peers);
}

#[test]
fn test_sort_peers_by_distance_and_latency() {
// Sort an empty list of peers
Expand Down Expand Up @@ -487,7 +627,7 @@ mod tests {
async fn test_sort_peers_for_subscriptions() {
// Create a consensus observer client
let network_ids = &[NetworkId::Validator, NetworkId::Vfn, NetworkId::Public];
let (peers_and_metadata, consensus_observer_client) =
let (peers_and_metadata, consensus_observer_client, _) =
create_consensus_observer_client(network_ids);

// Create a consensus publisher
Expand All @@ -507,15 +647,10 @@ mod tests {

// Add a connected validator peer, VFN peer and public peer
for network_id in network_ids {
let distance_from_validators = match network_id {
NetworkId::Validator => 0,
NetworkId::Vfn => 1,
NetworkId::Public => 2,
};
create_peer_and_connection(
*network_id,
peers_and_metadata.clone(),
distance_from_validators,
get_distance_from_validators(network_id),
None,
true,
);
Expand Down Expand Up @@ -609,6 +744,64 @@ mod tests {
assert_eq!(sorted_peers, expected_peers);
}

/// Creates new subscriptions and verifies the results
async fn create_and_verify_subscriptions(
consensus_observer_config: ConsensusObserverConfig,
peers_and_metadata: Arc<PeersAndMetadata>,
consensus_observer_client: Arc<
ConsensusObserverClient<NetworkClient<ConsensusObserverMessage>>,
>,
peer_manager_request_receivers: &mut HashMap<
NetworkId,
aptos_channel::Receiver<(PeerId, ProtocolId), PeerManagerRequest>,
>,
num_subscriptions_to_create: usize,
expected_subscription_peers: Vec<PeerNetworkId>,
) {
// Get the connected peers and metadata
let connected_peers_and_metadata = peers_and_metadata
.get_connected_peers_and_metadata()
.unwrap();

// Spawn the subscription creation task
let subscription_creation_handle = tokio::spawn(async move {
create_new_subscriptions(
consensus_observer_config,
consensus_observer_client.clone(),
None,
Arc::new(MockDatabaseReader::new()),
TimeService::mock(),
connected_peers_and_metadata,
num_subscriptions_to_create,
vec![],
vec![],
)
.await
});

// Handle the peer manager requests made by the subscription creation task
for expected_subscription_peer in &expected_subscription_peers {
handle_next_subscription_request(
expected_subscription_peer.network_id(),
peer_manager_request_receivers,
true,
)
.await;
}

// Wait for the subscription creation task to complete
let consensus_observer_subscriptions = subscription_creation_handle.await.unwrap();

// Verify the created subscriptions
assert_eq!(
consensus_observer_subscriptions.len(),
expected_subscription_peers.len()
);
for subscription in consensus_observer_subscriptions {
assert!(expected_subscription_peers.contains(&subscription.get_peer_network_id()));
}
}

/// Creates a new connection metadata for testing
fn create_connection_metadata(
peer_network_id: PeerNetworkId,
Expand Down Expand Up @@ -636,19 +829,52 @@ mod tests {
}
}

/// Creates a new consensus observer client and a peers and metadata container
/// Creates a new consensus observer client, along with the
/// associated network senders and peers and metadata.
fn create_consensus_observer_client(
network_ids: &[NetworkId],
) -> (
Arc<PeersAndMetadata>,
Arc<ConsensusObserverClient<NetworkClient<ConsensusObserverMessage>>>,
HashMap<NetworkId, aptos_channel::Receiver<(PeerId, ProtocolId), PeerManagerRequest>>,
) {
// Create the network senders and receivers for each network
let mut network_senders = HashMap::new();
let mut peer_manager_request_receivers = HashMap::new();
for network_id in network_ids {
// Create the request managers
let queue_cfg = aptos_channel::Config::new(10).queue_style(QueueStyle::FIFO);
let (peer_manager_request_sender, peer_manager_request_receiver) = queue_cfg.build();
let (connected_request_sender, _) = queue_cfg.build();

// Create the network sender
let network_sender = NetworkSender::new(
PeerManagerRequestSender::new(peer_manager_request_sender),
ConnectionRequestSender::new(connected_request_sender),
);

// Save the network sender and the request receiver
network_senders.insert(*network_id, network_sender);
peer_manager_request_receivers.insert(*network_id, peer_manager_request_receiver);
}

// Create the network client
let peers_and_metadata = PeersAndMetadata::new(network_ids);
let network_client =
NetworkClient::new(vec![], vec![], hashmap![], peers_and_metadata.clone());
let network_client = NetworkClient::new(
vec![ProtocolId::ConsensusObserver],
vec![ProtocolId::ConsensusObserverRpc],
network_senders,
peers_and_metadata.clone(),
);

// Create the consensus observer client
let consensus_observer_client = Arc::new(ConsensusObserverClient::new(network_client));

(peers_and_metadata, consensus_observer_client)
(
peers_and_metadata,
consensus_observer_client,
peer_manager_request_receivers,
)
}

/// Creates a new peer with the specified connection metadata
Expand Down Expand Up @@ -754,6 +980,76 @@ mod tests {
peers_and_metadata
}

/// Returns the distance from the validators for the specified network
fn get_distance_from_validators(network_id: &NetworkId) -> u64 {
match network_id {
NetworkId::Validator => 0,
NetworkId::Vfn => 1,
NetworkId::Public => 2,
}
}

/// Fetches and handles the next subscription request from the peer manager
async fn handle_next_subscription_request(
network_id: NetworkId,
peer_manager_request_receivers: &mut HashMap<
NetworkId,
aptos_channel::Receiver<(PeerId, ProtocolId), PeerManagerRequest>,
>,
return_successfully: bool,
) {
// Get the request receiver for the given network
let peer_manager_request_receiver =
peer_manager_request_receivers.get_mut(&network_id).unwrap();

// Wait for the next subscription request
match peer_manager_request_receiver.next().await {
Some(PeerManagerRequest::SendRpc(_, network_request)) => {
// Parse the network request
let data = network_request.data;
let response_sender = network_request.res_tx;
let message: ConsensusObserverMessage = bcs::from_bytes(data.as_ref()).unwrap();

// Process the network message
match message {
ConsensusObserverMessage::Request(request) => {
// Verify the request is for a new subscription
match request {
ConsensusObserverRequest::Subscribe => (),
_ => panic!(
"Unexpected consensus observer request received: {:?}!",
request
),
}

// Determine the response to send
let response = if return_successfully {
// Ack the subscription request
ConsensusObserverResponse::SubscribeAck
} else {
// Respond with the wrong message type
ConsensusObserverResponse::UnsubscribeAck
};
let response_message = ConsensusObserverMessage::Response(response);

// Send the response to the peer
let response_bytes =
bcs::to_bytes(&response_message).map(Bytes::from).unwrap();
let _ = response_sender.send(Ok(response_bytes));
},
_ => panic!(
"Unexpected consensus observer message type received: {:?}!",
message
),
}
},
Some(PeerManagerRequest::SendDirectSend(_, _)) => {
panic!("Unexpected direct send message received!")
},
None => panic!("No subscription request received!"),
}
}

/// Removes the peer and connection metadata for the given peer
fn remove_peer_and_connection(
peers_and_metadata: Arc<PeersAndMetadata>,
Expand Down

0 comments on commit ac75a80

Please sign in to comment.