Skip to content

Commit

Permalink
Prioritize blocked task messaging over idle tasks
Browse files Browse the repository at this point in the history
  • Loading branch information
ryoqun committed Apr 6, 2024
1 parent c207274 commit 3d1be8d
Showing 1 changed file with 33 additions and 12 deletions.
45 changes: 33 additions & 12 deletions unified-scheduler-pool/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -530,9 +530,13 @@ impl<S: SpawnableScheduler<TH>, TH: TaskHandler> ThreadManager<S, TH> {
}

fn start_threads(&mut self, context: &SchedulingContext) {
let (mut runnable_task_sender, runnable_task_receiver) =
let (mut blocked_task_sender, blocked_task_receiver) =
chained_channel::unbounded::<Task, SchedulingContext>(context.clone());
let (finished_task_sender, finished_task_receiver) = unbounded::<Box<ExecutedTask>>();
let (idle_task_sender, idle_task_receiver) = unbounded::<Task>();
let (finished_blocked_task_sender, finished_blocked_task_receiver) =
unbounded::<Box<ExecutedTask>>();
let (finished_idle_task_sender, finished_idle_task_receiver) =
unbounded::<Box<ExecutedTask>>();

let mut result_with_timings = self.session_result_with_timings.take();

Expand Down Expand Up @@ -626,7 +630,7 @@ impl<S: SpawnableScheduler<TH>, TH: TaskHandler> ThreadManager<S, TH> {
// into busy looping to seek lowest latency eventually. However, not now,
// to measure _actual_ cpu usage easily with the select approach.
select! {
recv(finished_task_receiver) -> executed_task => {
recv(finished_blocked_task_receiver) -> executed_task => {
let executed_task = executed_task.unwrap();

state_machine.deschedule_task(&executed_task.task);
Expand All @@ -639,20 +643,20 @@ impl<S: SpawnableScheduler<TH>, TH: TaskHandler> ThreadManager<S, TH> {
let task = state_machine
.schedule_next_unblocked_task()
.expect("unblocked task");
runnable_task_sender.send_payload(task).unwrap();
blocked_task_sender.send_payload(task).unwrap();
},
recv(new_task_receiver) -> message => {
assert!(!session_ending);

match message.unwrap() {
NewTaskPayload::Payload(task) => {
if let Some(task) = state_machine.schedule_task(task) {
runnable_task_sender.send_payload(task).unwrap();
idle_task_sender.send(task).unwrap();
}
}
NewTaskPayload::OpenSubchannel(context) => {
// signal about new SchedulingContext to handler threads
runnable_task_sender
blocked_task_sender
.send_chained_channel(context, handler_count)
.unwrap();
assert_matches!(
Expand All @@ -665,6 +669,13 @@ impl<S: SpawnableScheduler<TH>, TH: TaskHandler> ThreadManager<S, TH> {
}
}
},
recv(finished_idle_task_receiver) -> executed_task => {
let executed_task = executed_task.unwrap();

state_machine.deschedule_task(&executed_task.task);
let result_with_timings = result_with_timings.as_mut().unwrap();
Self::accumulate_result_with_timings(result_with_timings, executed_task);
},
};

is_finished = session_ending && state_machine.has_no_active_task();
Expand All @@ -687,22 +698,32 @@ impl<S: SpawnableScheduler<TH>, TH: TaskHandler> ThreadManager<S, TH> {

let handler_main_loop = || {
let pool = self.pool.clone();
let mut runnable_task_receiver = runnable_task_receiver.clone();
let finished_task_sender = finished_task_sender.clone();
let mut blocked_task_receiver = blocked_task_receiver.clone();
let mut idle_task_receiver = idle_task_receiver.clone();
let finished_blocked_task_sender = finished_blocked_task_sender.clone();
let finished_idle_task_sender = finished_idle_task_sender.clone();

move || loop {
let (task, sender) = select! {
recv(runnable_task_receiver.for_select()) -> message => {
if let Some(task) = runnable_task_receiver.after_select(message.unwrap()) {
(task, &finished_task_sender)
recv(blocked_task_receiver.for_select()) -> message => {
if let Some(task) = blocked_task_receiver.after_select(message.unwrap()) {
(task, &finished_blocked_task_sender)
} else {
continue;
}
},
recv(idle_task_receiver) -> task => {
if let Ok(task) = task {
(task, &finished_idle_task_sender)
} else {
idle_task_receiver = never();
continue;
}
},
};
let mut task = ExecutedTask::new_boxed(task);
Self::execute_task_with_handler(
runnable_task_receiver.context().bank(),
blocked_task_receiver.context().bank(),
&mut task,
&pool.handler_context,
);
Expand Down

0 comments on commit 3d1be8d

Please sign in to comment.