Skip to content

Commit

Permalink
fix test_rpc_subscriptions (solana-labs#27394)
Browse files Browse the repository at this point in the history
* add pubkey to account sender/receiver

* send tx when all subscription ready

* make variables shorter

* add 15 sec timeout for waiting signature/account subscription setup
  • Loading branch information
yihau authored and pull[bot] committed Feb 1, 2024
1 parent deb50be commit 05cf761
Showing 1 changed file with 51 additions and 26 deletions.
77 changes: 51 additions & 26 deletions rpc-test/tests/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,10 @@ use {
std::{
collections::HashSet,
net::UdpSocket,
sync::Arc,
sync::{
atomic::{AtomicUsize, Ordering},
Arc,
},
thread::sleep,
time::{Duration, Instant},
},
Expand Down Expand Up @@ -264,15 +267,13 @@ fn test_rpc_subscriptions() {
.collect();
let mut signature_set: HashSet<Signature> =
transactions.iter().map(|tx| tx.signatures[0]).collect();
let account_set: HashSet<Pubkey> = transactions
let mut account_set: HashSet<Pubkey> = transactions
.iter()
.map(|tx| tx.message.account_keys[1])
.collect();

// Track when subscriptions are ready
let (ready_sender, ready_receiver) = unbounded::<()>();
// Track account notifications are received
let (account_sender, account_receiver) = unbounded::<RpcResponse<UiAccount>>();
let (account_sender, account_receiver) = unbounded::<(Pubkey, RpcResponse<UiAccount>)>();
// Track when status notifications are received
let (status_sender, status_receiver) =
unbounded::<(Signature, RpcResponse<RpcSignatureResult>)>();
Expand All @@ -281,12 +282,19 @@ fn test_rpc_subscriptions() {
let rt = Runtime::new().unwrap();
let rpc_pubsub_url = test_validator.rpc_pubsub_url();
let signature_set_clone = signature_set.clone();
let account_set_clone = account_set.clone();
let signature_subscription_ready = Arc::new(AtomicUsize::new(0));
let account_subscription_ready = Arc::new(AtomicUsize::new(0));
let signature_subscription_ready_clone = signature_subscription_ready.clone();
let account_subscription_ready_clone = account_subscription_ready.clone();

rt.spawn(async move {
let pubsub_client = Arc::new(PubsubClient::new(&rpc_pubsub_url).await.unwrap());

// Subscribe to signature notifications
for signature in signature_set_clone {
let status_sender = status_sender.clone();
let signature_subscription_ready_clone = signature_subscription_ready_clone.clone();
tokio::spawn({
let _pubsub_client = Arc::clone(&pubsub_client);
async move {
Expand All @@ -301,6 +309,8 @@ fn test_rpc_subscriptions() {
.await
.unwrap();

signature_subscription_ready_clone.fetch_add(1, Ordering::SeqCst);

let response = sig_notifications.next().await.unwrap();
status_sender.send((signature, response)).unwrap();
sig_unsubscribe().await;
Expand All @@ -309,8 +319,9 @@ fn test_rpc_subscriptions() {
}

// Subscribe to account notifications
for pubkey in account_set {
for pubkey in account_set_clone {
let account_sender = account_sender.clone();
let account_subscription_ready_clone = account_subscription_ready_clone.clone();
tokio::spawn({
let _pubsub_client = Arc::clone(&pubsub_client);
async move {
Expand All @@ -325,28 +336,43 @@ fn test_rpc_subscriptions() {
.await
.unwrap();

account_subscription_ready_clone.fetch_add(1, Ordering::SeqCst);

let response = account_notifications.next().await.unwrap();
account_sender.send(response).unwrap();
account_sender.send((pubkey, response)).unwrap();
account_unsubscribe().await;
}
});
}

// Signal ready after the next slot notification
tokio::spawn({
let _pubsub_client = Arc::clone(&pubsub_client);
async move {
let (mut slot_notifications, slot_unsubscribe) =
_pubsub_client.slot_subscribe().await.unwrap();
let _response = slot_notifications.next().await.unwrap();
ready_sender.send(()).unwrap();
slot_unsubscribe().await;
}
});
});

// Wait for signature subscriptions
ready_receiver.recv_timeout(Duration::from_secs(2)).unwrap();
let now = Instant::now();
while (signature_subscription_ready.load(Ordering::SeqCst) != transactions.len()
|| account_subscription_ready.load(Ordering::SeqCst) != transactions.len())
&& now.elapsed() < Duration::from_secs(15)
{
sleep(Duration::from_millis(100))
}

// check signature subscription
let num = signature_subscription_ready.load(Ordering::SeqCst);
if num != transactions.len() {
error!(
"signature subscription didn't setup properly, want: {}, got: {}",
transactions.len(),
num
);
}

// check account subscription
let num = account_subscription_ready.load(Ordering::SeqCst);
if num != transactions.len() {
error!(
"account subscriptions didn't setup properly, want: {}, got: {}",
transactions.len(),
num
);
}

let rpc_client = RpcClient::new(test_validator.rpc_url());
let mut mint_balance = rpc_client
Expand Down Expand Up @@ -406,18 +432,17 @@ fn test_rpc_subscriptions() {
}

let deadline = Instant::now() + Duration::from_secs(60);
let mut account_notifications = transactions.len();
while account_notifications > 0 {
while !account_set.is_empty() {
let timeout = deadline.saturating_duration_since(Instant::now());
match account_receiver.recv_timeout(timeout) {
Ok(result) => {
Ok((pubkey, result)) => {
assert_eq!(result.value.lamports, Rent::default().minimum_balance(0));
account_notifications -= 1;
assert!(account_set.remove(&pubkey));
}
Err(_err) => {
panic!(
"recv_timeout, {}/{} accounts remaining",
account_notifications,
account_set.len(),
transactions.len()
);
}
Expand Down

0 comments on commit 05cf761

Please sign in to comment.