Skip to content

Commit

Permalink
Apply cosmetic changes to unified scheduler (#1861)
Browse files Browse the repository at this point in the history
* Apply cosmetic changes to unified scheduler

* Use first instead of old-fashioned firstly

Co-authored-by: Andrew Fitzgerald <[email protected]>

---------

Co-authored-by: Andrew Fitzgerald <[email protected]>
  • Loading branch information
ryoqun and apfitzge committed Jun 26, 2024
1 parent a107d53 commit 008a899
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 24 deletions.
9 changes: 5 additions & 4 deletions runtime/src/installed_scheduler_pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,15 +25,16 @@ use {
log::*,
solana_program_runtime::timings::ExecuteTimings,
solana_sdk::{
clock::Slot,
hash::Hash,
slot_history::Slot,
transaction::{Result, SanitizedTransaction, TransactionError},
},
std::{
fmt::{self, Debug},
mem,
ops::Deref,
sync::{Arc, RwLock},
thread,
},
};
#[cfg(feature = "dev-context-only-utils")]
Expand Down Expand Up @@ -623,7 +624,7 @@ impl BankWithSchedulerInner {
"wait_for_scheduler_termination(slot: {}, reason: {:?}): started at {:?}...",
bank.slot(),
reason,
std::thread::current(),
thread::current(),
);

let mut scheduler = scheduler.write().unwrap();
Expand Down Expand Up @@ -656,7 +657,7 @@ impl BankWithSchedulerInner {
reason,
was_noop,
result_with_timings.as_ref().map(|(result, _)| result),
std::thread::current(),
thread::current(),
);
trace!(
"wait_for_scheduler_termination(result_with_timings: {:?})",
Expand All @@ -667,7 +668,7 @@ impl BankWithSchedulerInner {
}

fn drop_scheduler(&self) {
if std::thread::panicking() {
if thread::panicking() {
error!(
"BankWithSchedulerInner::drop_scheduler(): slot: {} skipping due to already panicking...",
self.bank.slot(),
Expand Down
38 changes: 18 additions & 20 deletions unified-scheduler-pool/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -940,7 +940,7 @@ impl<S: SpawnableScheduler<TH>, TH: TaskHandler> ThreadManager<S, TH> {
// 4. the handler thread processes the dispatched task.
// 5. the handler thread reply back to the scheduler thread as an executed task.
// 6. the scheduler thread post-processes the executed task.
let scheduler_main_loop = || {
let scheduler_main_loop = {
let handler_count = self.pool.handler_count;
let session_result_sender = self.session_result_sender.clone();
// Taking new_task_receiver here is important to ensure there's a single receiver. In
Expand Down Expand Up @@ -1006,7 +1006,7 @@ impl<S: SpawnableScheduler<TH>, TH: TaskHandler> ThreadManager<S, TH> {
};

// The following loop maintains and updates ResultWithTimings as its
// externally-provieded mutable state for each session in this way:
// externally-provided mutable state for each session in this way:
//
// 1. Initial result_with_timing is propagated implicitly by the moved variable.
// 2. Subsequent result_with_timings are propagated explicitly from
Expand Down Expand Up @@ -1062,9 +1062,8 @@ impl<S: SpawnableScheduler<TH>, TH: TaskHandler> ThreadManager<S, TH> {
Ok(NewTaskPayload::CloseSubchannel) => {
session_ending = true;
}
Ok(NewTaskPayload::OpenSubchannel(_context_and_result_with_timings)) => {
unreachable!();
}
Ok(NewTaskPayload::OpenSubchannel(_context_and_result_with_timings)) =>
unreachable!(),
Err(RecvError) => {
// Mostly likely is that this scheduler is dropped for pruned blocks of
// abandoned forks...
Expand All @@ -1087,17 +1086,16 @@ impl<S: SpawnableScheduler<TH>, TH: TaskHandler> ThreadManager<S, TH> {
is_finished = session_ending && state_machine.has_no_active_task();
}

if session_ending {
state_machine.reinitialize();
session_result_sender
.send(std::mem::replace(
&mut result_with_timings,
initialized_result_with_timings(),
))
.expect("always outlived receiver");
session_ending = false;
}
// Finalize the current session after asserting it's explicitly requested so.
assert!(session_ending);
// Send result first because this is blocking the replay code-path.
session_result_sender
.send(result_with_timings)
.expect("always outlived receiver");
state_machine.reinitialize();
session_ending = false;

// Prepare for the new session.
match new_task_receiver.recv() {
Ok(NewTaskPayload::OpenSubchannel((
new_context,
Expand All @@ -1111,13 +1109,13 @@ impl<S: SpawnableScheduler<TH>, TH: TaskHandler> ThreadManager<S, TH> {
.unwrap();
result_with_timings = new_result_with_timings;
}
Ok(_) => {
unreachable!();
}
Err(_) => {
// This unusual condition must be triggered by ThreadManager::drop();
// This unusual condition must be triggered by ThreadManager::drop().
// Initialize result_with_timings with a harmless value...
result_with_timings = initialized_result_with_timings();
break 'nonaborted_main_loop;
}
Ok(_) => unreachable!(),
}
}

Expand Down Expand Up @@ -1212,7 +1210,7 @@ impl<S: SpawnableScheduler<TH>, TH: TaskHandler> ThreadManager<S, TH> {
self.scheduler_thread = Some(
thread::Builder::new()
.name("solScheduler".to_owned())
.spawn_tracked(scheduler_main_loop())
.spawn_tracked(scheduler_main_loop)
.unwrap(),
);

Expand Down

0 comments on commit 008a899

Please sign in to comment.