Skip to content

Commit

Permalink
Add helpful comments for peculiar crossbeam usage
Browse files Browse the repository at this point in the history
  • Loading branch information
ryoqun committed Feb 29, 2024
1 parent 0a85082 commit 89b7773
Showing 1 changed file with 25 additions and 1 deletion.
26 changes: 25 additions & 1 deletion unified-scheduler-pool/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -404,7 +404,11 @@ impl AddressBook {
}
}

// (this is slow needing atomic mem reads. However, this can be turned into a lot faster
// optimizer-friendly version as shown in this crossbeam pr:
// https://github.com/crossbeam-rs/crossbeam/pull/1047)
fn disconnected<T>() -> Receiver<T> {
// drop the sender residing at .0, returning an always-disconnected receiver.
unbounded().1
}

Expand Down Expand Up @@ -595,6 +599,26 @@ impl<S: SpawnableScheduler<TH>, TH: TaskHandler> ThreadManager<S, TH> {
loop {
let mut is_finished = false;
while !is_finished {
// ALL recv selectors are eager-evaluated ALWAYS by current crossbeam impl,
// which isn't great and is inconsistent with `if`s in the Rust's match
// arm. So, eagerly binding the result to a variable unconditionally here
// makes no perf. difference...
let dummy_unblocked_task_receiver =
dummy_receiver(state_machine.has_unblocked_task());

// (Assume this is biased; i.e. select_biased! in this crossbeam pr:
// https://github.com/rust-lang/futures-rs/pull/1976)
//
// There's something special called dummy_unblocked_task_receiver here.
// This odd pattern was needed to react to newly unblocked tasks from
// _not-crossbeam-channel_ event sources, precisely at the specified
// precedence among other selectors, while delegating the conrol flow to
// select_biased!.
//
// In this way, hot looping is avoided and overall control flow is much
// consistent. Note that unified scheduler will go
// into busy looping to seek lowest latency eventually. However, not now,
// to measure _actual_ cpu usage easily with the select approach.
select! {
recv(finished_task_receiver) -> executed_task => {
let executed_task = executed_task.unwrap();
Expand All @@ -603,7 +627,7 @@ impl<S: SpawnableScheduler<TH>, TH: TaskHandler> ThreadManager<S, TH> {
let result_with_timings = result_with_timings.as_mut().unwrap();
Self::accumulate_result_with_timings(result_with_timings, executed_task);
},
recv(dummy_receiver(state_machine.has_unblocked_task())) -> dummy => {
recv(dummy_unblocked_task_receiver) -> dummy => {
assert_matches!(dummy, Err(RecvError));

let task = state_machine.schedule_unblocked_task().expect("unblocked task");
Expand Down

0 comments on commit 89b7773

Please sign in to comment.