Skip to content
This repository has been archived by the owner on Jan 13, 2025. It is now read-only.

Modify rpc_completed_slot_service to be non-blocking #24007

Merged
merged 19 commits into from
Mar 31, 2022
Merged
28 changes: 21 additions & 7 deletions core/src/validator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -872,8 +872,11 @@ impl Validator {
let (gossip_verified_vote_hash_sender, gossip_verified_vote_hash_receiver) = unbounded();
let (cluster_confirmed_slot_sender, cluster_confirmed_slot_receiver) = unbounded();

let rpc_completed_slots_service =
RpcCompletedSlotsService::spawn(completed_slots_receiver, rpc_subscriptions.clone());
let rpc_completed_slots_service = RpcCompletedSlotsService::spawn(
completed_slots_receiver,
rpc_subscriptions.clone(),
exit.clone(),
);

let (replay_vote_sender, replay_vote_receiver) = unbounded();
let tvu = Tvu::new(
Expand Down Expand Up @@ -1806,9 +1809,10 @@ pub fn is_snapshot_config_valid(
mod tests {
use {
super::*,
crossbeam_channel::{bounded, RecvTimeoutError},
solana_ledger::{create_new_tmp_ledger, genesis_utils::create_genesis_config_with_leader},
solana_sdk::{genesis_config::create_genesis_config, poh_config::PohConfig},
std::fs::remove_dir_all,
std::{fs::remove_dir_all, thread, time::Duration},
};

fn validator_exit() {
Expand Down Expand Up @@ -1926,12 +1930,22 @@ mod tests {

// Each validator can exit in parallel to speed many sequential calls to join`
validators.iter_mut().for_each(|v| v.exit());
// While join is called sequentially, the above exit call notified all the
// validators to exit from all their threads
validators.into_iter().for_each(|validator| {
validator.join();

// spawn a new thread to wait for the join of the validator
let (sender, receiver) = bounded(0);
let _ = thread::spawn(move || {
validators.into_iter().for_each(|validator| {
validator.join();
});
sender.send(()).unwrap();
});

// timeout of 30s for shutting down the validators
let timeout = Duration::from_secs(30);
if let Err(RecvTimeoutError::Timeout) = receiver.recv_timeout(timeout) {
panic!("timeout for shutting down validators",);
}

for path in ledger_paths {
remove_dir_all(path).unwrap();
}
Expand Down
20 changes: 17 additions & 3 deletions rpc/src/rpc_completed_slots_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,21 +4,35 @@ use {
solana_ledger::blockstore::CompletedSlotsReceiver,
solana_sdk::timing::timestamp,
std::{
sync::Arc,
sync::{
atomic::{AtomicBool, Ordering},
Arc,
},
thread::{Builder, JoinHandle},
time::Duration,
},
};

pub const COMPLETE_SLOT_REPORT_SLEEP_MS: u64 = 100;

pub struct RpcCompletedSlotsService;
impl RpcCompletedSlotsService {
pub fn spawn(
completed_slots_receiver: CompletedSlotsReceiver,
rpc_subscriptions: Arc<RpcSubscriptions>,
exit: Arc<AtomicBool>,
) -> JoinHandle<()> {
Builder::new()
.name("solana-rpc-completed-slots-service".to_string())
.spawn(move || {
for slots in completed_slots_receiver.iter() {
.spawn(move || loop {
// shutdown the service
if exit.load(Ordering::Relaxed) {
break;
}

if let Ok(slots) = completed_slots_receiver
.recv_timeout(Duration::from_millis(COMPLETE_SLOT_REPORT_SLEEP_MS))
{
for slot in slots {
rpc_subscriptions.notify_slot_update(SlotUpdate::Completed {
slot,
Expand Down