Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Prioritize blocked task messaging over idle tasks #627

Merged
merged 11 commits into from
Apr 18, 2024
182 changes: 170 additions & 12 deletions unified-scheduler-pool/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -530,9 +530,65 @@ impl<S: SpawnableScheduler<TH>, TH: TaskHandler> ThreadManager<S, TH> {
}

fn start_threads(&mut self, context: &SchedulingContext) {
ryoqun marked this conversation as resolved.
Show resolved Hide resolved
let (mut runnable_task_sender, runnable_task_receiver) =
// Firstly, setup bi-directional messaging between the scheduler and handlers to pass
// around tasks, by creating 2 channels (one for to-be-handled tasks from the scheduler to
// the handlers and the other for finished tasks from the handlers to the scheduler).
// Furthermore, this pair of channels is duplicated to work as a primitive 2-level priority
// queue, totalling 4 channels.
//
// This quasi-priority-queue arrangement is desired as an optimization to prioritize
// blocked tasks.
//
// As a quick background, SchedulingStateMachine doesn't throttle runnable tasks at all.
// Thus, it's likely for to-be-handled tasks to be stalled for extended duration due to
// excessive buffering (commonly known as buffer bloat). Normally, these buffering isn't
// problematic and actually intentional to fully saturate all the handler threads.
//
// However, there's one caveat: task dependencies. It can be hinted with tasks being
// blocked, that there could be more similarly-blocked tasks in the future. Empirically,
// clearing these linearized long run of blocking tasks out of the buffer is delaying bank
// freezing while only using 1 handler thread or two near the end of slot, deteriorating
// the concurrency.
//
// To alleviate the situation, blocked tasks are exchanged via independent communication
// pathway as a heuristic. Without prioritization of these tasks, progression of clearing
// them would be severely hampered due to interleaved not-blocked tasks (called _idle_
// here; typically, voting transactions) in the single buffer.
//
// Concurrent priority queue isn't used to avoid penalized throughput due to higher
// overhead than crossbeam channel, even considering the doubled processing of the
// crossbeam channel. Fortunately, just 2-level prioritization is enough. Also, sticking to
// crossbeam was easy to implement and there was no popular and promising crate for
// concurrent priority queue as of writing.
//
// It's generally harmless for blocked task buffer to be flooded, stalling the idle tasks
// completely. Firstly, it's unlikely without malice, considering all blocked task must
// somehow be independently blocked for each isolated linearized runs because all buffered
// blocked and idle tasks must not conflicting with each other. Furthermore, handler
// threads would still be saturated to maximum even under such block-verification
// situation, meaning no remotely-controlled performance degradation.
//
// Overall, while this is merely a heuristic, this is effective and adaptive.
//
// One known caveat, though, is that this heuristic is employed under sub-optimal settings,
// considering scheduling is done in real-time. Namely, prioritization enforcement isn't
// immediate, where the first of a long run of tasks is buried in the middle of a large
// idle task buffer. Prioritization of such a run will be realized after the first task is
// handled with the priority of an idle task. To overcome this, some kind of
// re-prioritization or look-ahead mechanism would be needed. However, both isn't
// implemented. The former is due to complex implementation and the later due to delayed
// (NOT real-time) processing.
//
// Finally, note that this optimization should be combined with biased select (i.e.
// `select_biased!`), which isn't for now... However, consistent performance imporvement is
// observed just with this priority queuing alone.
let (mut blocked_task_sender, blocked_task_receiver) =
chained_channel::unbounded::<Task, SchedulingContext>(context.clone());
let (finished_task_sender, finished_task_receiver) = unbounded::<Box<ExecutedTask>>();
let (idle_task_sender, idle_task_receiver) = unbounded::<Task>();
let (finished_blocked_task_sender, finished_blocked_task_receiver) =
unbounded::<Box<ExecutedTask>>();
let (finished_idle_task_sender, finished_idle_task_receiver) =
unbounded::<Box<ExecutedTask>>();

let mut result_with_timings = self.session_result_with_timings.take();

Expand Down Expand Up @@ -626,7 +682,7 @@ impl<S: SpawnableScheduler<TH>, TH: TaskHandler> ThreadManager<S, TH> {
// into busy looping to seek lowest latency eventually. However, not now,
// to measure _actual_ cpu usage easily with the select approach.
select! {
recv(finished_task_receiver) -> executed_task => {
recv(finished_blocked_task_receiver) -> executed_task => {
let executed_task = executed_task.unwrap();

state_machine.deschedule_task(&executed_task.task);
Expand All @@ -639,20 +695,20 @@ impl<S: SpawnableScheduler<TH>, TH: TaskHandler> ThreadManager<S, TH> {
let task = state_machine
.schedule_next_unblocked_task()
.expect("unblocked task");
runnable_task_sender.send_payload(task).unwrap();
blocked_task_sender.send_payload(task).unwrap();
},
recv(new_task_receiver) -> message => {
assert!(!session_ending);

match message.unwrap() {
NewTaskPayload::Payload(task) => {
if let Some(task) = state_machine.schedule_task(task) {
runnable_task_sender.send_payload(task).unwrap();
idle_task_sender.send(task).unwrap();
}
}
NewTaskPayload::OpenSubchannel(context) => {
// signal about new SchedulingContext to handler threads
runnable_task_sender
blocked_task_sender
.send_chained_channel(context, handler_count)
.unwrap();
assert_matches!(
Expand All @@ -665,6 +721,13 @@ impl<S: SpawnableScheduler<TH>, TH: TaskHandler> ThreadManager<S, TH> {
}
}
},
recv(finished_idle_task_receiver) -> executed_task => {
let executed_task = executed_task.unwrap();

state_machine.deschedule_task(&executed_task.task);
let result_with_timings = result_with_timings.as_mut().unwrap();
Self::accumulate_result_with_timings(result_with_timings, executed_task);
},
};

is_finished = session_ending && state_machine.has_no_active_task();
Expand All @@ -687,22 +750,32 @@ impl<S: SpawnableScheduler<TH>, TH: TaskHandler> ThreadManager<S, TH> {

let handler_main_loop = || {
let pool = self.pool.clone();
let mut runnable_task_receiver = runnable_task_receiver.clone();
let finished_task_sender = finished_task_sender.clone();
let mut blocked_task_receiver = blocked_task_receiver.clone();
let mut idle_task_receiver = idle_task_receiver.clone();
let finished_blocked_task_sender = finished_blocked_task_sender.clone();
let finished_idle_task_sender = finished_idle_task_sender.clone();

move || loop {
let (task, sender) = select! {
recv(runnable_task_receiver.for_select()) -> message => {
if let Some(task) = runnable_task_receiver.after_select(message.unwrap()) {
(task, &finished_task_sender)
recv(blocked_task_receiver.for_select()) -> message => {
if let Some(task) = blocked_task_receiver.after_select(message.unwrap()) {
(task, &finished_blocked_task_sender)
} else {
continue;
}
},
recv(idle_task_receiver) -> task => {
if let Ok(task) = task {
(task, &finished_idle_task_sender)
} else {
idle_task_receiver = never();
continue;
}
},
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

here (back ref: solana-labs#34676 (comment))

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not convinced that the multi-channel set up works correctly without select_biased!.

Let's say we have something along the lines of:

  1. blocked_task_sender => new context
  2. idle_task_sender => idle tasks

The idle tasks should be for the new context, but as far as I can tell, there's nothing preventing them from being randomly picked up before the new context in the handler threads.

I do believe this will work with a select_biased! call and the proper ordering, but with the random selecting it seems like there's random chance of the chained channel stuff getting messed up wrt to the idle tasks.
Maybe I am missing something?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I am wrong about this, it'd be great to add a test to convince me 😄

Copy link
Member Author

@ryoqun ryoqun Apr 16, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're totally correct. You saved me. This is a race condition...: f02ebd8, 4e045d7

I do believe this will work with a select_biased! call and the proper ordering

Well, this won't work even with select_biased!... It'll firstly try to receive blocked, then idle. After that, it'll sched_yield. Before the sched_yeild, the handler thread still could see the idle task for the next context, if it became visible to the thread just after trying to receive blocked and missed to see new context. this means scheduler thread managed to send the new context then the context's idle task successively between the two try_recvs.

I think this is the root cause of a mysterious panic, which i only observed once while running against mb...

};
let mut task = ExecutedTask::new_boxed(task);
Self::execute_task_with_handler(
runnable_task_receiver.context().bank(),
blocked_task_receiver.context().bank(),
&mut task,
&pool.handler_context,
);
Expand Down Expand Up @@ -1111,6 +1184,91 @@ mod tests {
);
}

#[test]
fn test_scheduler_schedule_execution_blocked() {
solana_logger::setup();

const STALLED_TRANSACTION_INDEX: usize = 0;
const BLOCKED_TRANSACTION_INDEX: usize = 1;
static LOCK_TO_STALL: Mutex<()> = Mutex::new(());

#[derive(Debug)]
struct StallingHandler;
impl TaskHandler for StallingHandler {
fn handle(
result: &mut Result<()>,
timings: &mut ExecuteTimings,
bank: &Arc<Bank>,
transaction: &SanitizedTransaction,
index: usize,
handler_context: &HandlerContext,
) {
match index {
STALLED_TRANSACTION_INDEX => *LOCK_TO_STALL.lock().unwrap(),
BLOCKED_TRANSACTION_INDEX => {}
_ => unreachable!(),
};
DefaultTaskHandler::handle(
result,
timings,
bank,
transaction,
index,
handler_context,
);
}
}

let GenesisConfigInfo {
genesis_config,
mint_keypair,
..
} = create_genesis_config(10_000);

// tx0 and tx1 is definitely conflicting to write-lock the mint address
let tx0 = &SanitizedTransaction::from_transaction_for_tests(system_transaction::transfer(
&mint_keypair,
&solana_sdk::pubkey::new_rand(),
2,
genesis_config.hash(),
));
let tx1 = &SanitizedTransaction::from_transaction_for_tests(system_transaction::transfer(
&mint_keypair,
&solana_sdk::pubkey::new_rand(),
2,
genesis_config.hash(),
));

let bank = Bank::new_for_tests(&genesis_config);
let bank = setup_dummy_fork_graph(bank);
let ignored_prioritization_fee_cache = Arc::new(PrioritizationFeeCache::new(0u64));
let pool = SchedulerPool::<PooledScheduler<StallingHandler>, _>::new_dyn(
None,
None,
None,
None,
ignored_prioritization_fee_cache,
);
let context = SchedulingContext::new(bank.clone());

assert_eq!(bank.transaction_count(), 0);
let scheduler = pool.take_scheduler(context);

// Stall handling tx0 and tx1
let lock_to_stall = LOCK_TO_STALL.lock().unwrap();
scheduler.schedule_execution(&(tx0, STALLED_TRANSACTION_INDEX));
scheduler.schedule_execution(&(tx1, BLOCKED_TRANSACTION_INDEX));

// Wait a bit for the scheduler thread to decide to block tx1
std::thread::sleep(std::time::Duration::from_secs(1));

// Resume handling by unlocking LOCK_TO_STALL
drop(lock_to_stall);
let bank = BankWithScheduler::new(bank, Some(scheduler));
assert_matches!(bank.wait_for_completed_scheduler(), Some((Ok(()), _)));
assert_eq!(bank.transaction_count(), 2);
}

#[derive(Debug)]
struct AsyncScheduler<const TRIGGER_RACE_CONDITION: bool>(
Mutex<ResultWithTimings>,
Expand Down
Loading