Skip to content

Commit

Permalink
Remove unnecessary arc and mutex for rpc notifications (#8351)
Browse files Browse the repository at this point in the history
  • Loading branch information
jstarry authored Feb 21, 2020
1 parent ab361a8 commit 01697a9
Showing 1 changed file with 50 additions and 56 deletions.
106 changes: 50 additions & 56 deletions core/src/rpc_subscriptions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ use solana_sdk::{
account::Account, clock::Slot, pubkey::Pubkey, signature::Signature, transaction,
};
use solana_vote_program::vote_state::MAX_LOCKOUT_HISTORY;
use std::ops::DerefMut;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::mpsc::{Receiver, RecvTimeoutError, SendError, Sender};
use std::thread::{Builder, JoinHandle};
Expand Down Expand Up @@ -48,8 +47,6 @@ impl std::fmt::Debug for NotificationEntry {
}
}

type NotificationSend = Arc<Mutex<NotificationEntry>>;

type RpcAccountSubscriptions =
RwLock<HashMap<Pubkey, HashMap<SubscriptionId, (Sink<RpcAccount>, Confirmations)>>>;
type RpcProgramSubscriptions =
Expand Down Expand Up @@ -204,7 +201,7 @@ pub struct RpcSubscriptions {
program_subscriptions: Arc<RpcProgramSubscriptions>,
signature_subscriptions: Arc<RpcSignatureSubscriptions>,
slot_subscriptions: Arc<RpcSlotSubscriptions>,
notification_sender: Arc<Mutex<Sender<Arc<Mutex<NotificationEntry>>>>>,
notification_sender: Arc<Mutex<Sender<NotificationEntry>>>,
t_cleanup: Option<JoinHandle<()>>,
exit: Arc<AtomicBool>,
}
Expand All @@ -226,8 +223,8 @@ impl Drop for RpcSubscriptions {
impl RpcSubscriptions {
pub fn new(exit: &Arc<AtomicBool>) -> Self {
let (notification_sender, notification_receiver): (
Sender<NotificationSend>,
Receiver<NotificationSend>,
Sender<NotificationEntry>,
Receiver<NotificationEntry>,
) = std::sync::mpsc::channel();

let account_subscriptions = Arc::new(RpcAccountSubscriptions::default());
Expand Down Expand Up @@ -397,7 +394,7 @@ impl RpcSubscriptions {
.notification_sender
.lock()
.unwrap()
.send(Arc::new(Mutex::new(notification_entry)))
.send(notification_entry)
{
Ok(()) => (),
Err(SendError(notification)) => {
Expand All @@ -411,7 +408,7 @@ impl RpcSubscriptions {

fn process_notifications(
exit: Arc<AtomicBool>,
notification_receiver: Receiver<Arc<Mutex<NotificationEntry>>>,
notification_receiver: Receiver<NotificationEntry>,
account_subscriptions: Arc<RpcAccountSubscriptions>,
program_subscriptions: Arc<RpcProgramSubscriptions>,
signature_subscriptions: Arc<RpcSignatureSubscriptions>,
Expand All @@ -422,57 +419,54 @@ impl RpcSubscriptions {
break;
}
match notification_receiver.recv_timeout(Duration::from_millis(RECEIVE_DELAY_MILLIS)) {
Ok(notification_entry) => {
let mut notification_entry = notification_entry.lock().unwrap();
match notification_entry.deref_mut() {
NotificationEntry::Slot(slot_info) => {
let subscriptions = slot_subscriptions.read().unwrap();
for (_, sink) in subscriptions.iter() {
sink.notify(Ok(*slot_info)).wait().unwrap();
}
Ok(notification_entry) => match notification_entry {
NotificationEntry::Slot(slot_info) => {
let subscriptions = slot_subscriptions.read().unwrap();
for (_, sink) in subscriptions.iter() {
sink.notify(Ok(slot_info)).wait().unwrap();
}
}
NotificationEntry::Bank((current_slot, bank_forks)) => {
let pubkeys: Vec<_> = {
let subs = account_subscriptions.read().unwrap();
subs.keys().cloned().collect()
};
for pubkey in &pubkeys {
Self::check_account(
pubkey,
current_slot,
&bank_forks,
account_subscriptions.clone(),
);
}
NotificationEntry::Bank((current_slot, bank_forks)) => {
let pubkeys: Vec<_> = {
let subs = account_subscriptions.read().unwrap();
subs.keys().cloned().collect()
};
for pubkey in &pubkeys {
Self::check_account(
pubkey,
*current_slot,
&bank_forks,
account_subscriptions.clone(),
);
}

let programs: Vec<_> = {
let subs = program_subscriptions.read().unwrap();
subs.keys().cloned().collect()
};
for program_id in &programs {
Self::check_program(
program_id,
*current_slot,
&bank_forks,
program_subscriptions.clone(),
);
}

let signatures: Vec<_> = {
let subs = signature_subscriptions.read().unwrap();
subs.keys().cloned().collect()
};
for signature in &signatures {
Self::check_signature(
signature,
*current_slot,
&bank_forks,
signature_subscriptions.clone(),
);
}

let programs: Vec<_> = {
let subs = program_subscriptions.read().unwrap();
subs.keys().cloned().collect()
};
for program_id in &programs {
Self::check_program(
program_id,
current_slot,
&bank_forks,
program_subscriptions.clone(),
);
}

let signatures: Vec<_> = {
let subs = signature_subscriptions.read().unwrap();
subs.keys().cloned().collect()
};
for signature in &signatures {
Self::check_signature(
signature,
current_slot,
&bank_forks,
signature_subscriptions.clone(),
);
}
}
}
},
Err(RecvTimeoutError::Timeout) => {
// not a problem - try reading again
}
Expand Down

0 comments on commit 01697a9

Please sign in to comment.