Skip to content

Commit

Permalink
Modify rpc_completed_slot_service to be non-blocking (#24007)
Browse files Browse the repository at this point in the history
* timeout for validator exits

* clippy

* print backtrace when panic

* add backtrace package

* increase time out to 30s

* debug logging

* make rpc complete service non blocking

* reduce log level

* remove logging

* recv_timeout

* remove backtrace

* remove sleep

* remove unused variable

* add comments

* Update core/src/validator.rs

Co-authored-by: Trent Nelson <[email protected]>

* Update core/src/validator.rs

Co-authored-by: Trent Nelson <[email protected]>

* whitespace

* more whitespace

* fix build

Co-authored-by: Trent Nelson <[email protected]>
(cherry picked from commit 51b37f0)

# Conflicts:
#	core/src/validator.rs
  • Loading branch information
HaoranYi authored and mergify[bot] committed May 18, 2022
1 parent 12a12b9 commit de30944
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 9 deletions.
29 changes: 23 additions & 6 deletions core/src/validator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -871,8 +871,11 @@ impl Validator {
let (gossip_verified_vote_hash_sender, gossip_verified_vote_hash_receiver) = unbounded();
let (cluster_confirmed_slot_sender, cluster_confirmed_slot_receiver) = unbounded();

let rpc_completed_slots_service =
RpcCompletedSlotsService::spawn(completed_slots_receiver, rpc_subscriptions.clone());
let rpc_completed_slots_service = RpcCompletedSlotsService::spawn(
completed_slots_receiver,
rpc_subscriptions.clone(),
exit.clone(),
);

let (replay_vote_sender, replay_vote_receiver) = unbounded();
let tvu = Tvu::new(
Expand Down Expand Up @@ -1813,7 +1816,11 @@ mod tests {
crossbeam_channel::{bounded, RecvTimeoutError},
solana_ledger::{create_new_tmp_ledger, genesis_utils::create_genesis_config_with_leader},
solana_sdk::{genesis_config::create_genesis_config, poh_config::PohConfig},
<<<<<<< HEAD
std::{fs::remove_dir_all, thread},
=======
std::{fs::remove_dir_all, thread, time::Duration},
>>>>>>> 51b37f018 (Modify rpc_completed_slot_service to be non-blocking (#24007))
};

#[test]
Expand Down Expand Up @@ -1948,12 +1955,22 @@ mod tests {

// Each validator can exit in parallel to speed many sequential calls to join`
validators.iter_mut().for_each(|v| v.exit());
// While join is called sequentially, the above exit call notified all the
// validators to exit from all their threads
validators.into_iter().for_each(|validator| {
validator.join();

// spawn a new thread to wait for the join of the validator
let (sender, receiver) = bounded(0);
let _ = thread::spawn(move || {
validators.into_iter().for_each(|validator| {
validator.join();
});
sender.send(()).unwrap();
});

// timeout of 30s for shutting down the validators
let timeout = Duration::from_secs(30);
if let Err(RecvTimeoutError::Timeout) = receiver.recv_timeout(timeout) {
panic!("timeout for shutting down validators",);
}

for path in ledger_paths {
remove_dir_all(path).unwrap();
}
Expand Down
20 changes: 17 additions & 3 deletions rpc/src/rpc_completed_slots_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,21 +4,35 @@ use {
solana_ledger::blockstore::CompletedSlotsReceiver,
solana_sdk::timing::timestamp,
std::{
sync::Arc,
sync::{
atomic::{AtomicBool, Ordering},
Arc,
},
thread::{Builder, JoinHandle},
time::Duration,
},
};

pub const COMPLETE_SLOT_REPORT_SLEEP_MS: u64 = 100;

pub struct RpcCompletedSlotsService;
impl RpcCompletedSlotsService {
pub fn spawn(
completed_slots_receiver: CompletedSlotsReceiver,
rpc_subscriptions: Arc<RpcSubscriptions>,
exit: Arc<AtomicBool>,
) -> JoinHandle<()> {
Builder::new()
.name("solana-rpc-completed-slots-service".to_string())
.spawn(move || {
for slots in completed_slots_receiver.iter() {
.spawn(move || loop {
// shutdown the service
if exit.load(Ordering::Relaxed) {
break;
}

if let Ok(slots) = completed_slots_receiver
.recv_timeout(Duration::from_millis(COMPLETE_SLOT_REPORT_SLEEP_MS))
{
for slot in slots {
rpc_subscriptions.notify_slot_update(SlotUpdate::Completed {
slot,
Expand Down

0 comments on commit de30944

Please sign in to comment.