Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Modify rpc_completed_slot_service to be non-blocking #24007

Merged
merged 19 commits into from
Mar 31, 2022
Merged
33 changes: 26 additions & 7 deletions core/src/validator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -872,8 +872,11 @@ impl Validator {
let (gossip_verified_vote_hash_sender, gossip_verified_vote_hash_receiver) = unbounded();
let (cluster_confirmed_slot_sender, cluster_confirmed_slot_receiver) = unbounded();

let rpc_completed_slots_service =
RpcCompletedSlotsService::spawn(completed_slots_receiver, rpc_subscriptions.clone());
let rpc_completed_slots_service = RpcCompletedSlotsService::spawn(
completed_slots_receiver,
rpc_subscriptions.clone(),
exit.clone(),
);

let (replay_vote_sender, replay_vote_receiver) = unbounded();
let tvu = Tvu::new(
Expand Down Expand Up @@ -1093,17 +1096,23 @@ impl Validator {
}

self.gossip_service.join().expect("gossip_service");

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can probably do without the whitespace changes in this set

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yup. done. thanks!

self.serve_repair_service
.join()
.expect("serve_repair_service");

self.stats_reporter_service
.join()
.expect("stats_reporter_service");

self.tpu.join().expect("tpu");

self.tvu.join().expect("tvu");

self.completed_data_sets_service
.join()
.expect("completed_data_sets_service");

if let Some(ip_echo_server) = self.ip_echo_server {
ip_echo_server.shutdown_background();
}
Expand Down Expand Up @@ -1806,9 +1815,10 @@ pub fn is_snapshot_config_valid(
mod tests {
use {
super::*,
crossbeam_channel::unbounded,
solana_ledger::{create_new_tmp_ledger, genesis_utils::create_genesis_config_with_leader},
solana_sdk::{genesis_config::create_genesis_config, poh_config::PohConfig},
std::fs::remove_dir_all,
std::{fs::remove_dir_all, thread, time::Duration},
};

fn validator_exit() {
Expand Down Expand Up @@ -1926,12 +1936,21 @@ mod tests {

// Each validator can exit in parallel to speed many sequential calls to join`
validators.iter_mut().for_each(|v| v.exit());
// While join is called sequentially, the above exit call notified all the
// validators to exit from all their threads
validators.into_iter().for_each(|validator| {
validator.join();

// spawn a new thread to wait for the join of the validator
let (sender, receiver) = unbounded();
HaoranYi marked this conversation as resolved.
Show resolved Hide resolved
let _ = thread::spawn(move || {
validators.into_iter().for_each(|validator| {
validator.join();
});
sender.send(()).unwrap();
});

// timeout of 30s for shutting down the validators
if receiver.recv_timeout(Duration::from_secs(30)).is_err() {
HaoranYi marked this conversation as resolved.
Show resolved Hide resolved
panic!("timeout for shutting down validators",);
}

for path in ledger_paths {
remove_dir_all(path).unwrap();
}
Expand Down
20 changes: 17 additions & 3 deletions rpc/src/rpc_completed_slots_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,21 +4,35 @@ use {
solana_ledger::blockstore::CompletedSlotsReceiver,
solana_sdk::timing::timestamp,
std::{
sync::Arc,
sync::{
atomic::{AtomicBool, Ordering},
Arc,
},
thread::{Builder, JoinHandle},
time::Duration,
},
};

pub const COMPLETE_SLOT_REPORT_SLEEP_MS: u64 = 100;

pub struct RpcCompletedSlotsService;
impl RpcCompletedSlotsService {
pub fn spawn(
completed_slots_receiver: CompletedSlotsReceiver,
rpc_subscriptions: Arc<RpcSubscriptions>,
exit: Arc<AtomicBool>,
) -> JoinHandle<()> {
Builder::new()
.name("solana-rpc-completed-slots-service".to_string())
.spawn(move || {
for slots in completed_slots_receiver.iter() {
.spawn(move || loop {
// shutdown the service
if exit.load(Ordering::Relaxed) {
break;
}

if let Ok(slots) = completed_slots_receiver
.recv_timeout(Duration::from_millis(COMPLETE_SLOT_REPORT_SLEEP_MS))
{
for slot in slots {
rpc_subscriptions.notify_slot_update(SlotUpdate::Completed {
slot,
Expand Down