diff --git a/core/src/validator.rs b/core/src/validator.rs index 8d4ce24d5bbbc4..8b72ebe179c3d2 100644 --- a/core/src/validator.rs +++ b/core/src/validator.rs @@ -871,8 +871,11 @@ impl Validator { let (gossip_verified_vote_hash_sender, gossip_verified_vote_hash_receiver) = unbounded(); let (cluster_confirmed_slot_sender, cluster_confirmed_slot_receiver) = unbounded(); - let rpc_completed_slots_service = - RpcCompletedSlotsService::spawn(completed_slots_receiver, rpc_subscriptions.clone()); + let rpc_completed_slots_service = RpcCompletedSlotsService::spawn( + completed_slots_receiver, + rpc_subscriptions.clone(), + exit.clone(), + ); let (replay_vote_sender, replay_vote_receiver) = unbounded(); let tvu = Tvu::new( @@ -1813,7 +1816,11 @@ mod tests { crossbeam_channel::{bounded, RecvTimeoutError}, solana_ledger::{create_new_tmp_ledger, genesis_utils::create_genesis_config_with_leader}, solana_sdk::{genesis_config::create_genesis_config, poh_config::PohConfig}, +<<<<<<< HEAD std::{fs::remove_dir_all, thread}, +======= + std::{fs::remove_dir_all, thread, time::Duration}, +>>>>>>> 51b37f018 (Modify rpc_completed_slot_service to be non-blocking (#24007)) }; #[test] @@ -1948,12 +1955,22 @@ mod tests { // Each validator can exit in parallel to speed many sequential calls to join` validators.iter_mut().for_each(|v| v.exit()); - // While join is called sequentially, the above exit call notified all the - // validators to exit from all their threads - validators.into_iter().for_each(|validator| { - validator.join(); + + // spawn a new thread to wait for the join of the validator + let (sender, receiver) = bounded(0); + let _ = thread::spawn(move || { + validators.into_iter().for_each(|validator| { + validator.join(); + }); + sender.send(()).unwrap(); }); + // timeout of 30s for shutting down the validators + let timeout = Duration::from_secs(30); + if let Err(RecvTimeoutError::Timeout) = receiver.recv_timeout(timeout) { + panic!("timeout for shutting down validators",); + } + for path in ledger_paths { remove_dir_all(path).unwrap(); } diff --git a/rpc/src/rpc_completed_slots_service.rs b/rpc/src/rpc_completed_slots_service.rs index 0f9d393e61b88d..61eabba75a9dae 100644 --- a/rpc/src/rpc_completed_slots_service.rs +++ b/rpc/src/rpc_completed_slots_service.rs @@ -4,21 +4,35 @@ use { solana_ledger::blockstore::CompletedSlotsReceiver, solana_sdk::timing::timestamp, std::{ - sync::Arc, + sync::{ + atomic::{AtomicBool, Ordering}, + Arc, + }, thread::{Builder, JoinHandle}, + time::Duration, }, }; +pub const COMPLETE_SLOT_REPORT_SLEEP_MS: u64 = 100; + pub struct RpcCompletedSlotsService; impl RpcCompletedSlotsService { pub fn spawn( completed_slots_receiver: CompletedSlotsReceiver, rpc_subscriptions: Arc, + exit: Arc, ) -> JoinHandle<()> { Builder::new() .name("solana-rpc-completed-slots-service".to_string()) - .spawn(move || { - for slots in completed_slots_receiver.iter() { + .spawn(move || loop { + // shutdown the service + if exit.load(Ordering::Relaxed) { + break; + } + + if let Ok(slots) = completed_slots_receiver + .recv_timeout(Duration::from_millis(COMPLETE_SLOT_REPORT_SLEEP_MS)) + { for slot in slots { rpc_subscriptions.notify_slot_update(SlotUpdate::Completed { slot,