Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Prioritize blocked task messaging over idle tasks #627

Merged
merged 11 commits into from
Apr 18, 2024
45 changes: 33 additions & 12 deletions unified-scheduler-pool/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -530,9 +530,13 @@ impl<S: SpawnableScheduler<TH>, TH: TaskHandler> ThreadManager<S, TH> {
}

fn start_threads(&mut self, context: &SchedulingContext) {
ryoqun marked this conversation as resolved.
Show resolved Hide resolved
let (mut runnable_task_sender, runnable_task_receiver) =
let (mut blocked_task_sender, blocked_task_receiver) =
chained_channel::unbounded::<Task, SchedulingContext>(context.clone());
let (finished_task_sender, finished_task_receiver) = unbounded::<Box<ExecutedTask>>();
let (idle_task_sender, idle_task_receiver) = unbounded::<Task>();
let (finished_blocked_task_sender, finished_blocked_task_receiver) =
unbounded::<Box<ExecutedTask>>();
let (finished_idle_task_sender, finished_idle_task_receiver) =
unbounded::<Box<ExecutedTask>>();

let mut result_with_timings = self.session_result_with_timings.take();

Expand Down Expand Up @@ -626,7 +630,7 @@ impl<S: SpawnableScheduler<TH>, TH: TaskHandler> ThreadManager<S, TH> {
// into busy looping to seek lowest latency eventually. However, not now,
// to measure _actual_ cpu usage easily with the select approach.
select! {
recv(finished_task_receiver) -> executed_task => {
recv(finished_blocked_task_receiver) -> executed_task => {
let executed_task = executed_task.unwrap();

state_machine.deschedule_task(&executed_task.task);
Expand All @@ -639,20 +643,20 @@ impl<S: SpawnableScheduler<TH>, TH: TaskHandler> ThreadManager<S, TH> {
let task = state_machine
.schedule_next_unblocked_task()
.expect("unblocked task");
runnable_task_sender.send_payload(task).unwrap();
blocked_task_sender.send_payload(task).unwrap();
},
recv(new_task_receiver) -> message => {
assert!(!session_ending);

match message.unwrap() {
NewTaskPayload::Payload(task) => {
if let Some(task) = state_machine.schedule_task(task) {
runnable_task_sender.send_payload(task).unwrap();
idle_task_sender.send(task).unwrap();
}
}
NewTaskPayload::OpenSubchannel(context) => {
// signal about new SchedulingContext to handler threads
runnable_task_sender
blocked_task_sender
.send_chained_channel(context, handler_count)
.unwrap();
assert_matches!(
Expand All @@ -665,6 +669,13 @@ impl<S: SpawnableScheduler<TH>, TH: TaskHandler> ThreadManager<S, TH> {
}
}
},
recv(finished_idle_task_receiver) -> executed_task => {
let executed_task = executed_task.unwrap();

state_machine.deschedule_task(&executed_task.task);
let result_with_timings = result_with_timings.as_mut().unwrap();
Self::accumulate_result_with_timings(result_with_timings, executed_task);
},
};

is_finished = session_ending && state_machine.has_no_active_task();
Expand All @@ -687,22 +698,32 @@ impl<S: SpawnableScheduler<TH>, TH: TaskHandler> ThreadManager<S, TH> {

let handler_main_loop = || {
let pool = self.pool.clone();
let mut runnable_task_receiver = runnable_task_receiver.clone();
let finished_task_sender = finished_task_sender.clone();
let mut blocked_task_receiver = blocked_task_receiver.clone();
let mut idle_task_receiver = idle_task_receiver.clone();
let finished_blocked_task_sender = finished_blocked_task_sender.clone();
let finished_idle_task_sender = finished_idle_task_sender.clone();

move || loop {
let (task, sender) = select! {
recv(runnable_task_receiver.for_select()) -> message => {
if let Some(task) = runnable_task_receiver.after_select(message.unwrap()) {
(task, &finished_task_sender)
recv(blocked_task_receiver.for_select()) -> message => {
if let Some(task) = blocked_task_receiver.after_select(message.unwrap()) {
(task, &finished_blocked_task_sender)
} else {
continue;
}
},
recv(idle_task_receiver) -> task => {
if let Ok(task) = task {
(task, &finished_idle_task_sender)
} else {
idle_task_receiver = never();
continue;
}
},
Copy link
Member Author

@ryoqun ryoqun Apr 6, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

here (back ref: solana-labs#34676 (comment))

};
let mut task = ExecutedTask::new_boxed(task);
Self::execute_task_with_handler(
runnable_task_receiver.context().bank(),
blocked_task_receiver.context().bank(),
&mut task,
&pool.handler_context,
);
Expand Down
Loading