diff --git a/consensus/src/consensus_observer/observer/subscription_manager.rs b/consensus/src/consensus_observer/observer/subscription_manager.rs index deba8d52a98e6b..2d89163e1ae867 100644 --- a/consensus/src/consensus_observer/observer/subscription_manager.rs +++ b/consensus/src/consensus_observer/observer/subscription_manager.rs @@ -430,6 +430,95 @@ mod test { } } + #[tokio::test] + async fn test_check_and_manage_subscriptions() { + // Create a consensus observer client + let network_id = NetworkId::Public; + let (peers_and_metadata, consensus_observer_client) = + create_consensus_observer_client(&[network_id]); + + // Create a new subscription manager + let consensus_observer_config = ConsensusObserverConfig::default(); + let db_reader = create_mock_db_reader(); + let time_service = TimeService::mock(); + let mut subscription_manager = SubscriptionManager::new( + consensus_observer_client, + consensus_observer_config, + None, + db_reader.clone(), + time_service.clone(), + ); + + // Verify that no subscriptions are active + verify_active_subscription_peers(&subscription_manager, vec![]); + + // Check and manage the subscriptions + let result = subscription_manager.check_and_manage_subscriptions().await; + + // Verify that no subscriptions were terminated + assert!(result.is_ok()); + verify_active_subscription_peers(&subscription_manager, vec![]); + + // Add a new connected peer and subscription + let connected_peer_1 = + create_peer_and_connection(network_id, peers_and_metadata.clone(), 1, None, true); + create_observer_subscription( + &mut subscription_manager, + consensus_observer_config, + db_reader.clone(), + connected_peer_1, + time_service.clone(), + ); + + // Add another connected peer and subscription + let connected_peer_2 = + create_peer_and_connection(network_id, peers_and_metadata.clone(), 2, None, true); + create_observer_subscription( + &mut subscription_manager, + consensus_observer_config, + db_reader.clone(), + connected_peer_2, + TimeService::mock(), // Use a different time service (to avoid timeouts!) + ); + + // Check and manage the subscriptions + subscription_manager + .check_and_manage_subscriptions() + .await + .unwrap(); + + // Verify that the subscriptions are still active + verify_active_subscription_peers(&subscription_manager, vec![ + connected_peer_1, + connected_peer_2, + ]); + + // Elapse time to simulate a timeout for peer 1 + let mock_time_service = time_service.into_mock(); + mock_time_service.advance(Duration::from_millis( + consensus_observer_config.max_subscription_timeout_ms + 1, + )); + + // Check and manage the subscriptions + subscription_manager + .check_and_manage_subscriptions() + .await + .unwrap(); + + // Verify that the first subscription was terminated + verify_active_subscription_peers(&subscription_manager, vec![connected_peer_2]); + + // Disconnect the second peer + remove_peer_and_connection(peers_and_metadata.clone(), connected_peer_2); + + // Check and manage the subscriptions + let result = subscription_manager.check_and_manage_subscriptions().await; + + // Verify that the second subscription was terminated and an error was returned + verify_active_subscription_peers(&subscription_manager, vec![]); + assert_matches!(result, Err(Error::SubscriptionsReset(_))); + } + #[tokio::test] async fn test_check_subscription_health_connected() { // Create a consensus observer client @@ -461,11 +550,8 @@ mod test { // Check the active subscription and verify that it unhealthy (the peer is not connected) check_subscription_connection(&mut subscription_manager, peer_network_id, false); - // Terminate the subscription - let terminated_subscriptions = - terminate_any_unhealthy_subscriptions(&mut subscription_manager); - assert_eq!(terminated_subscriptions.len(), 1); - assert_eq!(terminated_subscriptions.first().unwrap().0, peer_network_id); + // Terminate unhealthy subscriptions and verify the subscription was removed + verify_terminated_unhealthy_subscriptions(&mut subscription_manager, vec![peer_network_id]); // Add a new connected peer let connected_peer = @@ -480,13 +566,14 @@ mod test { TimeService::mock(), ); - // Check the active subscriptions is still healthy + // Check the active subscription is still healthy check_subscription_connection(&mut subscription_manager, connected_peer, true); + // Terminate unhealthy subscriptions and verify none are removed + verify_terminated_unhealthy_subscriptions(&mut subscription_manager, vec![]); + // Verify that the active subscription is still present - assert!(subscription_manager - .get_active_subscription_peers() - .contains(&connected_peer)); + verify_active_subscription_peers(&subscription_manager, vec![connected_peer]); } #[tokio::test] @@ -529,6 +616,9 @@ mod test { // Check the active subscription and verify that it is healthy check_subscription_progress(&mut subscription_manager, connected_peer, true); + // Terminate unhealthy subscriptions and verify none are removed + verify_terminated_unhealthy_subscriptions(&mut subscription_manager, vec![]); + // Elapse time to simulate a DB progress error let mock_time_service = time_service.clone().into_mock(); mock_time_service.advance(Duration::from_millis( @@ -538,16 +628,11 @@ mod test { // Check the active subscription and verify that it is unhealthy (the DB is not syncing) check_subscription_progress(&mut subscription_manager, connected_peer, false); - // Terminate the subscription - let terminated_subscriptions = - terminate_any_unhealthy_subscriptions(&mut subscription_manager); - assert_eq!(terminated_subscriptions.len(), 1); - assert_eq!(terminated_subscriptions.first().unwrap().0, connected_peer); + // Terminate unhealthy subscriptions and verify the subscription was removed + verify_terminated_unhealthy_subscriptions(&mut subscription_manager, vec![connected_peer]); // Verify the active subscription is no longer present - assert!(subscription_manager - .get_active_subscription_peers() - .is_empty()); + verify_active_subscription_peers(&subscription_manager, vec![]); } #[tokio::test] @@ -585,6 +670,9 @@ mod test { // Check the active subscription and verify that it is healthy check_subscription_timeout(&mut subscription_manager, connected_peer, true); + // Terminate unhealthy subscriptions and verify none are removed + verify_terminated_unhealthy_subscriptions(&mut subscription_manager, vec![]); + // Elapse time to simulate a timeout let mock_time_service = time_service.clone().into_mock(); mock_time_service.advance(Duration::from_millis( @@ -594,16 +682,11 @@ mod test { // Check the active subscription and verify that it is unhealthy (the subscription timed out) check_subscription_timeout(&mut subscription_manager, connected_peer, false); - // Terminate the subscription - let terminated_subscriptions = - terminate_any_unhealthy_subscriptions(&mut subscription_manager); - assert_eq!(terminated_subscriptions.len(), 1); - assert_eq!(terminated_subscriptions.first().unwrap().0, connected_peer); + // Terminate unhealthy subscriptions and verify the subscription was removed + verify_terminated_unhealthy_subscriptions(&mut subscription_manager, vec![connected_peer]); // Verify the active subscription is no longer present - assert!(subscription_manager - .get_active_subscription_peers() - .is_empty()); + verify_active_subscription_peers(&subscription_manager, vec![]); } #[tokio::test] @@ -651,6 +734,9 @@ mod test { // Check the active subscription and verify that it is healthy check_subscription_optimality(&mut subscription_manager, suboptimal_peer, true); + // Terminate unhealthy subscriptions and verify none are removed + verify_terminated_unhealthy_subscriptions(&mut subscription_manager, vec![]); + // Elapse enough time to trigger the peer optimality check let mock_time_service = time_service.clone().into_mock(); mock_time_service.advance(Duration::from_millis( @@ -666,20 +752,89 @@ mod test { consensus_observer_config.subscription_refresh_interval_ms + 1, )); - // Terminate the subscription - let terminated_subscriptions = - terminate_any_unhealthy_subscriptions(&mut subscription_manager); - assert_eq!(terminated_subscriptions.len(), 1); - assert_eq!(terminated_subscriptions.first().unwrap().0, suboptimal_peer); + // Terminate any unhealthy subscriptions and verify the subscription was removed + verify_terminated_unhealthy_subscriptions(&mut subscription_manager, vec![suboptimal_peer]); // Verify the active subscription is no longer present - assert!(subscription_manager - .get_active_subscription_peers() - .is_empty()); + verify_active_subscription_peers(&subscription_manager, vec![]); } #[tokio::test] - async fn test_terminate_unhealthy_subscriptions() { + #[allow(clippy::await_holding_lock)] // Required to wait on the subscription creation task + async fn test_spawn_subscription_creation_task() { + // Create a consensus observer client + let network_id = NetworkId::Public; + let (_, consensus_observer_client) = create_consensus_observer_client(&[network_id]); + + // Create a new subscription manager + let consensus_observer_config = ConsensusObserverConfig::default(); + let db_reader = create_mock_db_reader(); + let time_service = TimeService::mock(); + let mut subscription_manager = SubscriptionManager::new( + consensus_observer_client, + consensus_observer_config, + None, + db_reader.clone(), + time_service.clone(), + ); + + // Verify that the active subscription creation task is empty + verify_subscription_creation_task(&subscription_manager, false); + + // Spawn a subscription creation task with 0 subscriptions to create + subscription_manager + .spawn_subscription_creation_task(0, vec![], vec![], hashmap![]) + .await; + + // Verify that the active subscription creation task is still empty (no task was spawned) + verify_subscription_creation_task(&subscription_manager, false); + + // Spawn a subscription creation task with 1 subscription to create + subscription_manager + .spawn_subscription_creation_task(1, vec![], vec![], hashmap![]) + .await; + + // Verify that the active subscription creation task is now populated + verify_subscription_creation_task(&subscription_manager, true); + + // Wait for the active subscription creation task to finish + if let Some(active_task) = subscription_manager + .active_subscription_creation_task + .lock() + .as_mut() + { + active_task.await.unwrap(); + } + + // Verify that the active subscription creation task is still present + verify_subscription_creation_task(&subscription_manager, true); + + // Verify that the active subscription creation task is finished + if let Some(active_task) = subscription_manager + .active_subscription_creation_task + .lock() + .as_ref() + { + assert!(active_task.is_finished()); + } + + // Spawn a subscription creation task with 2 subscriptions to create + subscription_manager + .spawn_subscription_creation_task(2, vec![], vec![], hashmap![]) + .await; + + // Verify the new active subscription creation task is not finished + if let Some(active_task) = subscription_manager + .active_subscription_creation_task + .lock() + .as_ref() + { + assert!(!active_task.is_finished()); + }; + } + + #[tokio::test] + async fn test_terminate_unhealthy_subscriptions_multiple() { // Create a consensus observer client let network_id = NetworkId::Public; let (peers_and_metadata, consensus_observer_client) = @@ -713,14 +868,8 @@ mod test { ); } - // Terminate any unhealthy subscriptions and verify that both subscriptions are still healthy - let terminated_subscriptions = - terminate_any_unhealthy_subscriptions(&mut subscription_manager); - assert!(terminated_subscriptions.is_empty()); - assert_eq!( - subscription_manager.get_active_subscription_peers().len(), - 2 - ); + // Terminate unhealthy subscriptions and verify that both subscriptions are still healthy + verify_terminated_unhealthy_subscriptions(&mut subscription_manager, vec![]); // Create another subscription let subscription_peer_3 = @@ -739,18 +888,14 @@ mod test { consensus_observer_config.max_subscription_timeout_ms + 1, )); - // Terminate the unhealthy subscriptions and verify the first two subscriptions were terminated - let terminated_subscriptions = - terminate_any_unhealthy_subscriptions(&mut subscription_manager); - assert_eq!(terminated_subscriptions.len(), 2); - assert_eq!(subscription_manager.get_active_subscription_peers(), vec![ - subscription_peer_3 + // Terminate unhealthy subscriptions and verify the first two subscriptions were terminated + verify_terminated_unhealthy_subscriptions(&mut subscription_manager, vec![ + subscription_peer_1, + subscription_peer_2, ]); - // Verify that both subscriptions were terminated due to a timeout - for (_, error) in terminated_subscriptions { - assert_matches!(error, Error::SubscriptionTimeout(_)); - } + // Verify the third subscription is still active + verify_active_subscription_peers(&subscription_manager, vec![subscription_peer_3]); } #[tokio::test] @@ -771,9 +916,7 @@ mod test { ); // Verify that no subscriptions are active - assert!(subscription_manager - .get_active_subscription_peers() - .is_empty()); + verify_active_subscription_peers(&subscription_manager, vec![]); // Create a new subscription let subscription_peer_1 = PeerNetworkId::random(); @@ -786,9 +929,7 @@ mod test { ); // Verify the subscription is active - assert!(subscription_manager - .get_active_subscription_peers() - .contains(&subscription_peer_1)); + verify_active_subscription_peers(&subscription_manager, vec![subscription_peer_1]); // Create another subscription let subscription_peer_2 = PeerNetworkId::random(); @@ -801,26 +942,16 @@ mod test { ); // Verify the second subscription is active - assert!(subscription_manager - .get_active_subscription_peers() - .contains(&subscription_peer_2)); + verify_active_subscription_peers(&subscription_manager, vec![ + subscription_peer_1, + subscription_peer_2, + ]); // Unsubscribe from the first peer subscription_manager.unsubscribe_from_peer(subscription_peer_1); // Verify that the first subscription is no longer active - assert!(!subscription_manager - .get_active_subscription_peers() - .contains(&subscription_peer_1)); - - // Verify that only the second subscription is still active - assert!(subscription_manager - .get_active_subscription_peers() - .contains(&subscription_peer_2)); - assert_eq!( - subscription_manager.get_active_subscription_peers().len(), - 1 - ); + verify_active_subscription_peers(&subscription_manager, vec![subscription_peer_2]); } #[tokio::test] @@ -1077,14 +1208,66 @@ mod test { peer_network_id } - /// A simple helper method that terminates any unhealthy subscriptions - fn terminate_any_unhealthy_subscriptions( + /// Removes the peer and connection metadata for the given peer + fn remove_peer_and_connection( + peers_and_metadata: Arc, + peer_network_id: PeerNetworkId, + ) { + let peer_metadata = peers_and_metadata + .get_metadata_for_peer(peer_network_id) + .unwrap(); + let connection_id = peer_metadata.get_connection_metadata().connection_id; + peers_and_metadata + .remove_peer_metadata(peer_network_id, connection_id) + .unwrap(); + } + + /// Verifies the active subscription peers + fn verify_active_subscription_peers( + subscription_manager: &SubscriptionManager, + expected_active_peers: Vec, + ) { + // Get the active subscription peers + let active_peers = subscription_manager.get_active_subscription_peers(); + + // Verify the active subscription peers + for peer in &expected_active_peers { + assert!(active_peers.contains(peer)); + } + assert_eq!(active_peers.len(), expected_active_peers.len()); + } + + /// Verifies the status of the active subscription creation task + fn verify_subscription_creation_task( + subscription_manager: &SubscriptionManager, + expect_active_task: bool, + ) { + let current_active_task = subscription_manager + .active_subscription_creation_task + .lock() + .is_some(); + assert_eq!(current_active_task, expect_active_task); + } + + /// Verifies the list of terminated unhealthy subscriptions + fn verify_terminated_unhealthy_subscriptions( subscription_manager: &mut SubscriptionManager, - ) -> Vec<(PeerNetworkId, Error)> { + expected_terminated_peers: Vec, + ) { // Get the connected peers and metadata let connected_peers_and_metadata = subscription_manager.get_connected_peers_and_metadata(); // Terminate any unhealthy subscriptions - subscription_manager.terminate_unhealthy_subscriptions(&connected_peers_and_metadata) + let terminated_subscriptions = + subscription_manager.terminate_unhealthy_subscriptions(&connected_peers_and_metadata); + + // Verify the terminated subscriptions + for (terminated_subscription_peer, _) in &terminated_subscriptions { + assert!(expected_terminated_peers.contains(terminated_subscription_peer)); + } + assert_eq!( + terminated_subscriptions.len(), + expected_terminated_peers.len() + ); } }