Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Prioritize blocked task messaging over idle tasks #627

Merged
merged 11 commits into from
Apr 18, 2024
217 changes: 201 additions & 16 deletions unified-scheduler-pool/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@

use {
assert_matches::assert_matches,
crossbeam_channel::{never, select, unbounded, Receiver, RecvError, SendError, Sender},
crossbeam_channel::{self, never, select, Receiver, RecvError, SendError, Sender},
dashmap::DashMap,
derivative::Derivative,
log::*,
Expand Down Expand Up @@ -415,7 +415,7 @@ impl UsageQueueLoader {
// https://github.com/crossbeam-rs/crossbeam/pull/1047)
fn disconnected<T>() -> Receiver<T> {
// drop the sender residing at .0, returning an always-disconnected receiver.
unbounded().1
crossbeam_channel::unbounded().1
}

fn initialized_result_with_timings() -> ResultWithTimings {
Expand Down Expand Up @@ -466,8 +466,8 @@ impl<TH: TaskHandler> PooledScheduler<TH> {

impl<S: SpawnableScheduler<TH>, TH: TaskHandler> ThreadManager<S, TH> {
fn new(pool: Arc<SchedulerPool<S, TH>>) -> Self {
let (new_task_sender, new_task_receiver) = unbounded();
let (session_result_sender, session_result_receiver) = unbounded();
let (new_task_sender, new_task_receiver) = crossbeam_channel::unbounded();
let (session_result_sender, session_result_receiver) = crossbeam_channel::unbounded();
let handler_count = pool.handler_count;

Self {
Expand Down Expand Up @@ -530,9 +530,92 @@ impl<S: SpawnableScheduler<TH>, TH: TaskHandler> ThreadManager<S, TH> {
}

fn start_threads(&mut self, context: &SchedulingContext) {
ryoqun marked this conversation as resolved.
Show resolved Hide resolved
let (mut runnable_task_sender, runnable_task_receiver) =
// Firstly, setup bi-directional messaging between the scheduler and handlers to pass
// around tasks, by creating 2 channels (one for to-be-handled tasks from the scheduler to
// the handlers and the other for finished tasks from the handlers to the scheduler).
// Furthermore, this pair of channels is duplicated to work as a primitive 2-level priority
// queue, totalling 4 channels.
//
// This quasi-priority-queue arrangement is desired as an optimization to prioritize
// blocked tasks.
//
// As a quick background, SchedulingStateMachine doesn't throttle runnable tasks at all.
// Thus, it's likely for to-be-handled tasks to be stalled for extended duration due to
// excessive buffering (commonly known as buffer bloat). Normally, this buffering isn't
// problematic and actually intentional to fully saturate all the handler threads.
//
// However, there's one caveat: task dependencies. It can be hinted with tasks being
// blocked, that there could be more similarly-blocked tasks in the future. Empirically,
// clearing these linearized long runs of blocking tasks out of the buffer is delaying bank
// freezing while only using 1 handler thread or two near the end of slot, deteriorating
// the overall concurrency.
apfitzge marked this conversation as resolved.
Show resolved Hide resolved
//
// To alleviate the situation, blocked tasks are exchanged via independent communication
// pathway as a heuristic for expedite processing. Without prioritization of these tasks,
// progression of clearing these runs would be severely hampered due to interleaved
// not-blocked tasks (called _idle_ here; typically, voting transactions) in the single
// buffer.
//
// Concurrent priority queue isn't used to avoid penalized throughput due to higher
// overhead than crossbeam channel, even considering the doubled processing of the
// crossbeam channel. Fortunately, just 2-level prioritization is enough. Also, sticking to
// crossbeam was convenient and there was no popular and promising crate for concurrent
// priority queue as of writing.
ryoqun marked this conversation as resolved.
Show resolved Hide resolved
//
// It's generally harmless for the blocked task buffer to be flooded, stalling the idle
// tasks completely. Firstly, it's unlikely without malice, considering all blocked tasks
// must have independently been blocked for each isolated linearized runs. That's because
// all to-be-handled tasks of the blocked and idle buffers must not be conflicting with
// each other by definition. Furthermore, handler threads would still be saturated to
// maximum even under such a block-verification situation, meaning no remotely-controlled
// performance degradation.
//
// Overall, while this is merely a heuristic, it's effective and adaptive while not
// vulnerable, merely reusing existing information without any additional runtime cost.
//
// One known caveat, though, is that this heuristic is employed under a sub-optimal
// setting, considering scheduling is done in real-time. Namely, prioritization enforcement
// isn't immediate, in a sense that the first task of a long run is buried in the middle of
// a large idle task buffer. Prioritization of such a run will be realized only after the
// first task is handled with the priority of an idle task. To overcome this, some kind of
// re-prioritization or look-ahead scheduling mechanism would be needed. However, both
// isn't implemented. The former is due to complex implementation and the later is due to
// delayed (NOT real-time) processing, which is against the unified scheduler design goal.
//
// Finally, note that this optimization should be combined with biased select (i.e.
// `select_biased!`), which isn't for now... However, consistent performance improvement is
// observed just with this priority queuing alone.
//
// Alternatively, more faithful prioritization can be realized by checking blocking
// statuses of all addresses immediately before sending to the handlers. This would prevent
// false negatives of the heuristics approach (i.e. the last task of a run doesn't need to
// be handled with the higher priority). Note that this is the only improvement, compared
// to the heuristics. That's because this underlying information asymmetry between the 2
// approaches doesn't exist for all other cases, assuming no look-ahead: idle tasks are
// always unblocked by definition, and other blocked tasks should always be calculated as
// blocked by the very existence of the last blocked task.
//
// The faithful approach incurs a considerable overhead: O(N), where N is the number of
// locked addresses in a task, adding to the current bare-minimum complexity of O(2*N) for
// both scheduling and descheduling. This means 1.5x increase. Furthermore, this doesn't
// nicely work in practice with a real-time streamed scheduler. That's because these
// linearized runs could be intermittent in the view with little or no look-back, albeit
// actually forming a far more longer runs in longer time span. These access patterns are
// very common, considering existence of well-known hot accounts.
//
// Thus, intentionally allowing these false-positives by the heuristic approach is actually
// helping to extend the logical prioritization session for the invisible longer runs, as
// long as the last task of the current run is being handled by the handlers, hoping yet
// another blocking new task is arriving to finalize the tentatively extended
// prioritization further. Consequently, this also contributes to alleviate the known
// heuristic's caveat for the first task of linearized runs, which is described above.
let (mut blocked_task_sender, blocked_task_receiver) =
chained_channel::unbounded::<Task, SchedulingContext>(context.clone());
let (finished_task_sender, finished_task_receiver) = unbounded::<Box<ExecutedTask>>();
let (idle_task_sender, idle_task_receiver) = crossbeam_channel::unbounded::<Task>();
let (finished_blocked_task_sender, finished_blocked_task_receiver) =
crossbeam_channel::unbounded::<Box<ExecutedTask>>();
let (finished_idle_task_sender, finished_idle_task_receiver) =
crossbeam_channel::unbounded::<Box<ExecutedTask>>();
apfitzge marked this conversation as resolved.
Show resolved Hide resolved

let mut result_with_timings = self.session_result_with_timings.take();

Expand Down Expand Up @@ -626,7 +709,7 @@ impl<S: SpawnableScheduler<TH>, TH: TaskHandler> ThreadManager<S, TH> {
// into busy looping to seek lowest latency eventually. However, not now,
// to measure _actual_ cpu usage easily with the select approach.
select! {
recv(finished_task_receiver) -> executed_task => {
recv(finished_blocked_task_receiver) -> executed_task => {
let executed_task = executed_task.unwrap();

state_machine.deschedule_task(&executed_task.task);
Expand All @@ -639,20 +722,20 @@ impl<S: SpawnableScheduler<TH>, TH: TaskHandler> ThreadManager<S, TH> {
let task = state_machine
.schedule_next_unblocked_task()
.expect("unblocked task");
runnable_task_sender.send_payload(task).unwrap();
blocked_task_sender.send_payload(task).unwrap();
},
recv(new_task_receiver) -> message => {
assert!(!session_ending);

match message.unwrap() {
NewTaskPayload::Payload(task) => {
if let Some(task) = state_machine.schedule_task(task) {
runnable_task_sender.send_payload(task).unwrap();
idle_task_sender.send(task).unwrap();
}
}
NewTaskPayload::OpenSubchannel(context) => {
// signal about new SchedulingContext to handler threads
runnable_task_sender
blocked_task_sender
.send_chained_channel(context, handler_count)
.unwrap();
assert_matches!(
Expand All @@ -665,6 +748,13 @@ impl<S: SpawnableScheduler<TH>, TH: TaskHandler> ThreadManager<S, TH> {
}
}
},
recv(finished_idle_task_receiver) -> executed_task => {
let executed_task = executed_task.unwrap();

state_machine.deschedule_task(&executed_task.task);
let result_with_timings = result_with_timings.as_mut().unwrap();
Self::accumulate_result_with_timings(result_with_timings, executed_task);
},
};

is_finished = session_ending && state_machine.has_no_active_task();
Expand All @@ -687,22 +777,32 @@ impl<S: SpawnableScheduler<TH>, TH: TaskHandler> ThreadManager<S, TH> {

let handler_main_loop = || {
let pool = self.pool.clone();
let mut runnable_task_receiver = runnable_task_receiver.clone();
let finished_task_sender = finished_task_sender.clone();
let mut blocked_task_receiver = blocked_task_receiver.clone();
let mut idle_task_receiver = idle_task_receiver.clone();
let finished_blocked_task_sender = finished_blocked_task_sender.clone();
let finished_idle_task_sender = finished_idle_task_sender.clone();

move || loop {
let (task, sender) = select! {
recv(runnable_task_receiver.for_select()) -> message => {
if let Some(task) = runnable_task_receiver.after_select(message.unwrap()) {
(task, &finished_task_sender)
recv(blocked_task_receiver.for_select()) -> message => {
if let Some(task) = blocked_task_receiver.after_select(message.unwrap()) {
(task, &finished_blocked_task_sender)
} else {
continue;
}
},
recv(idle_task_receiver) -> task => {
if let Ok(task) = task {
(task, &finished_idle_task_sender)
} else {
idle_task_receiver = never();
continue;
}
},
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

here (back ref: solana-labs#34676 (comment))

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not convinced that the multi-channel set up works correctly without select_biased!.

Let's say we have something along the lines of:

  1. blocked_task_sender => new context
  2. idle_task_sender => idle tasks

The idle tasks should be for the new context, but as far as I can tell, there's nothing preventing them from being randomly picked up before the new context in the handler threads.

I do believe this will work with a select_biased! call and the proper ordering, but with the random selecting it seems like there's random chance of the chained channel stuff getting messed up wrt to the idle tasks.
Maybe I am missing something?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I am wrong about this, it'd be great to add a test to convince me 😄

Copy link
Member Author

@ryoqun ryoqun Apr 16, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're totally correct. You saved me. This is a race condition...: f02ebd8, 4e045d7

I do believe this will work with a select_biased! call and the proper ordering

Well, this won't work even with select_biased!... It'll firstly try to receive blocked, then idle. After that, it'll sched_yield. Before the sched_yeild, the handler thread still could see the idle task for the next context, if it became visible to the thread just after trying to receive blocked and missed to see new context. this means scheduler thread managed to send the new context then the context's idle task successively between the two try_recvs.

I think this is the root cause of a mysterious panic, which i only observed once while running against mb...

};
let mut task = ExecutedTask::new_boxed(task);
Self::execute_task_with_handler(
runnable_task_receiver.context().bank(),
blocked_task_receiver.context().bank(),
&mut task,
&pool.handler_context,
);
Expand Down Expand Up @@ -1111,6 +1211,91 @@ mod tests {
);
}

#[test]
fn test_scheduler_schedule_execution_blocked() {
solana_logger::setup();

const STALLED_TRANSACTION_INDEX: usize = 0;
const BLOCKED_TRANSACTION_INDEX: usize = 1;
static LOCK_TO_STALL: Mutex<()> = Mutex::new(());

#[derive(Debug)]
struct StallingHandler;
impl TaskHandler for StallingHandler {
fn handle(
result: &mut Result<()>,
timings: &mut ExecuteTimings,
bank: &Arc<Bank>,
transaction: &SanitizedTransaction,
index: usize,
handler_context: &HandlerContext,
) {
match index {
STALLED_TRANSACTION_INDEX => *LOCK_TO_STALL.lock().unwrap(),
BLOCKED_TRANSACTION_INDEX => {}
_ => unreachable!(),
};
DefaultTaskHandler::handle(
result,
timings,
bank,
transaction,
index,
handler_context,
);
}
}

let GenesisConfigInfo {
genesis_config,
mint_keypair,
..
} = create_genesis_config(10_000);

// tx0 and tx1 is definitely conflicting to write-lock the mint address
let tx0 = &SanitizedTransaction::from_transaction_for_tests(system_transaction::transfer(
&mint_keypair,
&solana_sdk::pubkey::new_rand(),
2,
genesis_config.hash(),
));
let tx1 = &SanitizedTransaction::from_transaction_for_tests(system_transaction::transfer(
&mint_keypair,
&solana_sdk::pubkey::new_rand(),
2,
genesis_config.hash(),
));

let bank = Bank::new_for_tests(&genesis_config);
let bank = setup_dummy_fork_graph(bank);
let ignored_prioritization_fee_cache = Arc::new(PrioritizationFeeCache::new(0u64));
let pool = SchedulerPool::<PooledScheduler<StallingHandler>, _>::new_dyn(
None,
None,
None,
None,
ignored_prioritization_fee_cache,
);
let context = SchedulingContext::new(bank.clone());

assert_eq!(bank.transaction_count(), 0);
let scheduler = pool.take_scheduler(context);

// Stall handling tx0 and tx1
let lock_to_stall = LOCK_TO_STALL.lock().unwrap();
scheduler.schedule_execution(&(tx0, STALLED_TRANSACTION_INDEX));
scheduler.schedule_execution(&(tx1, BLOCKED_TRANSACTION_INDEX));

// Wait a bit for the scheduler thread to decide to block tx1
std::thread::sleep(std::time::Duration::from_secs(1));

// Resume handling by unlocking LOCK_TO_STALL
drop(lock_to_stall);
let bank = BankWithScheduler::new(bank, Some(scheduler));
assert_matches!(bank.wait_for_completed_scheduler(), Some((Ok(()), _)));
assert_eq!(bank.transaction_count(), 2);
}

#[derive(Debug)]
struct AsyncScheduler<const TRIGGER_RACE_CONDITION: bool>(
Mutex<ResultWithTimings>,
Expand Down
Loading