diff --git a/rpc-test/tests/rpc.rs b/rpc-test/tests/rpc.rs index d6c68d9425f047..a4e704232a3b20 100644 --- a/rpc-test/tests/rpc.rs +++ b/rpc-test/tests/rpc.rs @@ -33,7 +33,10 @@ use { std::{ collections::HashSet, net::UdpSocket, - sync::Arc, + sync::{ + atomic::{AtomicUsize, Ordering}, + Arc, + }, thread::sleep, time::{Duration, Instant}, }, @@ -264,15 +267,13 @@ fn test_rpc_subscriptions() { .collect(); let mut signature_set: HashSet = transactions.iter().map(|tx| tx.signatures[0]).collect(); - let account_set: HashSet = transactions + let mut account_set: HashSet = transactions .iter() .map(|tx| tx.message.account_keys[1]) .collect(); - // Track when subscriptions are ready - let (ready_sender, ready_receiver) = unbounded::<()>(); // Track account notifications are received - let (account_sender, account_receiver) = unbounded::>(); + let (account_sender, account_receiver) = unbounded::<(Pubkey, RpcResponse)>(); // Track when status notifications are received let (status_sender, status_receiver) = unbounded::<(Signature, RpcResponse)>(); @@ -281,12 +282,19 @@ fn test_rpc_subscriptions() { let rt = Runtime::new().unwrap(); let rpc_pubsub_url = test_validator.rpc_pubsub_url(); let signature_set_clone = signature_set.clone(); + let account_set_clone = account_set.clone(); + let signature_subscription_ready = Arc::new(AtomicUsize::new(0)); + let account_subscription_ready = Arc::new(AtomicUsize::new(0)); + let signature_subscription_ready_clone = signature_subscription_ready.clone(); + let account_subscription_ready_clone = account_subscription_ready.clone(); + rt.spawn(async move { let pubsub_client = Arc::new(PubsubClient::new(&rpc_pubsub_url).await.unwrap()); // Subscribe to signature notifications for signature in signature_set_clone { let status_sender = status_sender.clone(); + let signature_subscription_ready_clone = signature_subscription_ready_clone.clone(); tokio::spawn({ let _pubsub_client = Arc::clone(&pubsub_client); async move { @@ -301,6 +309,8 @@ fn test_rpc_subscriptions() { .await .unwrap(); + signature_subscription_ready_clone.fetch_add(1, Ordering::SeqCst); + let response = sig_notifications.next().await.unwrap(); status_sender.send((signature, response)).unwrap(); sig_unsubscribe().await; @@ -309,8 +319,9 @@ fn test_rpc_subscriptions() { } // Subscribe to account notifications - for pubkey in account_set { + for pubkey in account_set_clone { let account_sender = account_sender.clone(); + let account_subscription_ready_clone = account_subscription_ready_clone.clone(); tokio::spawn({ let _pubsub_client = Arc::clone(&pubsub_client); async move { @@ -325,28 +336,43 @@ fn test_rpc_subscriptions() { .await .unwrap(); + account_subscription_ready_clone.fetch_add(1, Ordering::SeqCst); + let response = account_notifications.next().await.unwrap(); - account_sender.send(response).unwrap(); + account_sender.send((pubkey, response)).unwrap(); account_unsubscribe().await; } }); } - - // Signal ready after the next slot notification - tokio::spawn({ - let _pubsub_client = Arc::clone(&pubsub_client); - async move { - let (mut slot_notifications, slot_unsubscribe) = - _pubsub_client.slot_subscribe().await.unwrap(); - let _response = slot_notifications.next().await.unwrap(); - ready_sender.send(()).unwrap(); - slot_unsubscribe().await; - } - }); }); - // Wait for signature subscriptions - ready_receiver.recv_timeout(Duration::from_secs(2)).unwrap(); + let now = Instant::now(); + while (signature_subscription_ready.load(Ordering::SeqCst) != transactions.len() + || account_subscription_ready.load(Ordering::SeqCst) != transactions.len()) + && now.elapsed() < Duration::from_secs(15) + { + sleep(Duration::from_millis(100)) + } + + // check signature subscription + let num = signature_subscription_ready.load(Ordering::SeqCst); + if num != transactions.len() { + error!( + "signature subscription didn't setup properly, want: {}, got: {}", + transactions.len(), + num + ); + } + + // check account subscription + let num = account_subscription_ready.load(Ordering::SeqCst); + if num != transactions.len() { + error!( + "account subscriptions didn't setup properly, want: {}, got: {}", + transactions.len(), + num + ); + } let rpc_client = RpcClient::new(test_validator.rpc_url()); let mut mint_balance = rpc_client @@ -406,18 +432,17 @@ fn test_rpc_subscriptions() { } let deadline = Instant::now() + Duration::from_secs(60); - let mut account_notifications = transactions.len(); - while account_notifications > 0 { + while !account_set.is_empty() { let timeout = deadline.saturating_duration_since(Instant::now()); match account_receiver.recv_timeout(timeout) { - Ok(result) => { + Ok((pubkey, result)) => { assert_eq!(result.value.lamports, Rent::default().minimum_balance(0)); - account_notifications -= 1; + assert!(account_set.remove(&pubkey)); } Err(_err) => { panic!( "recv_timeout, {}/{} accounts remaining", - account_notifications, + account_set.len(), transactions.len() ); }