diff --git a/core/src/rpc_subscriptions.rs b/core/src/rpc_subscriptions.rs index 9c880fd0c4469d..1159e01579fb5d 100644 --- a/core/src/rpc_subscriptions.rs +++ b/core/src/rpc_subscriptions.rs @@ -11,7 +11,6 @@ use solana_sdk::{ account::Account, clock::Slot, pubkey::Pubkey, signature::Signature, transaction, }; use solana_vote_program::vote_state::MAX_LOCKOUT_HISTORY; -use std::ops::DerefMut; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::mpsc::{Receiver, RecvTimeoutError, SendError, Sender}; use std::thread::{Builder, JoinHandle}; @@ -48,8 +47,6 @@ impl std::fmt::Debug for NotificationEntry { } } -type NotificationSend = Arc>; - type RpcAccountSubscriptions = RwLock, Confirmations)>>>; type RpcProgramSubscriptions = @@ -204,7 +201,7 @@ pub struct RpcSubscriptions { program_subscriptions: Arc, signature_subscriptions: Arc, slot_subscriptions: Arc, - notification_sender: Arc>>>>, + notification_sender: Arc>>, t_cleanup: Option>, exit: Arc, } @@ -226,8 +223,8 @@ impl Drop for RpcSubscriptions { impl RpcSubscriptions { pub fn new(exit: &Arc) -> Self { let (notification_sender, notification_receiver): ( - Sender, - Receiver, + Sender, + Receiver, ) = std::sync::mpsc::channel(); let account_subscriptions = Arc::new(RpcAccountSubscriptions::default()); @@ -397,7 +394,7 @@ impl RpcSubscriptions { .notification_sender .lock() .unwrap() - .send(Arc::new(Mutex::new(notification_entry))) + .send(notification_entry) { Ok(()) => (), Err(SendError(notification)) => { @@ -411,7 +408,7 @@ impl RpcSubscriptions { fn process_notifications( exit: Arc, - notification_receiver: Receiver>>, + notification_receiver: Receiver, account_subscriptions: Arc, program_subscriptions: Arc, signature_subscriptions: Arc, @@ -422,57 +419,54 @@ impl RpcSubscriptions { break; } match notification_receiver.recv_timeout(Duration::from_millis(RECEIVE_DELAY_MILLIS)) { - Ok(notification_entry) => { - let mut notification_entry = notification_entry.lock().unwrap(); - match notification_entry.deref_mut() { - NotificationEntry::Slot(slot_info) => { - let subscriptions = slot_subscriptions.read().unwrap(); - for (_, sink) in subscriptions.iter() { - sink.notify(Ok(*slot_info)).wait().unwrap(); - } + Ok(notification_entry) => match notification_entry { + NotificationEntry::Slot(slot_info) => { + let subscriptions = slot_subscriptions.read().unwrap(); + for (_, sink) in subscriptions.iter() { + sink.notify(Ok(slot_info)).wait().unwrap(); + } + } + NotificationEntry::Bank((current_slot, bank_forks)) => { + let pubkeys: Vec<_> = { + let subs = account_subscriptions.read().unwrap(); + subs.keys().cloned().collect() + }; + for pubkey in &pubkeys { + Self::check_account( + pubkey, + current_slot, + &bank_forks, + account_subscriptions.clone(), + ); } - NotificationEntry::Bank((current_slot, bank_forks)) => { - let pubkeys: Vec<_> = { - let subs = account_subscriptions.read().unwrap(); - subs.keys().cloned().collect() - }; - for pubkey in &pubkeys { - Self::check_account( - pubkey, - *current_slot, - &bank_forks, - account_subscriptions.clone(), - ); - } - - let programs: Vec<_> = { - let subs = program_subscriptions.read().unwrap(); - subs.keys().cloned().collect() - }; - for program_id in &programs { - Self::check_program( - program_id, - *current_slot, - &bank_forks, - program_subscriptions.clone(), - ); - } - - let signatures: Vec<_> = { - let subs = signature_subscriptions.read().unwrap(); - subs.keys().cloned().collect() - }; - for signature in &signatures { - Self::check_signature( - signature, - *current_slot, - &bank_forks, - signature_subscriptions.clone(), - ); - } + + let programs: Vec<_> = { + let subs = program_subscriptions.read().unwrap(); + subs.keys().cloned().collect() + }; + for program_id in &programs { + Self::check_program( + program_id, + current_slot, + &bank_forks, + program_subscriptions.clone(), + ); + } + + let signatures: Vec<_> = { + let subs = signature_subscriptions.read().unwrap(); + subs.keys().cloned().collect() + }; + for signature in &signatures { + Self::check_signature( + signature, + current_slot, + &bank_forks, + signature_subscriptions.clone(), + ); } } - } + }, Err(RecvTimeoutError::Timeout) => { // not a problem - try reading again }